Skip to content

Commit

Permalink
[core][performance] Only do PlasmaReleaseReply when client knows it's…
Browse files Browse the repository at this point in the history
… fallback allocated. (#41842) (#41925)

This is a performance fix for #40370.

Previously the plasma client sends a PlasmaReleaseRequest and does not wait for a reply. This causes the client to never know when it needs to unmap a fallback-allocated mmap. #40370 fixed it by adding back the PlasmaReleaseReply that says should_unmap and client unmaps. However this is in hot path of an object release, and most object releases are on main memory but still pays for this extra RTT.

This PR fixes by sharing more info: at Get/Create time, server notifies the client that this object is fallback_allocated if it lives on such a mmap. Then at Release time, the reply only happens if object is fallback_allocated. In hot path (main memory release), the reply is skipped so we no longer pay for the RTT.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
  • Loading branch information
rynewang authored Dec 15, 2023
1 parent 2dbb778 commit 8c63f91
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 30 deletions.
46 changes: 27 additions & 19 deletions src/ray/object_manager/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,29 +594,37 @@ Status PlasmaClient::Impl::Release(const ObjectID &object_id) {
// Check if the client is no longer using this object.
if (object_entry->second->count == 0) {
// object_entry is invalidated in MarkObjectUnused, need to read the fd beforehand.
MEMFD_TYPE fd = object_entry->second->object.store_fd;
// If the fd may be unmapped, we wait for the plasma server to send a ReleaseReply.
// Otherwise, skip the reply to boost performance.
// Q: since both server and client knows this fd is fallback allocated, why do we
// need to pass it in PlasmaReleaseRequest?
// A: becuase we wanna be idempotent, and in the 2nd call, the server does not know
// about the object.
const MEMFD_TYPE fd = object_entry->second->object.store_fd;
bool may_unmap = object_entry->second->object.fallback_allocated;
// Tell the store that the client no longer needs the object.
RAY_RETURN_NOT_OK(MarkObjectUnused(object_id));
RAY_RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
std::vector<uint8_t> buffer;
RAY_RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaReleaseReply, &buffer));
ObjectID released_object_id;

// `should_unmap` is set to true by the plasma server, when the mmap section is
// fallback-allocated and is no longer used. i.e. if the object ID is in the main
// memory, this boolean is always false.
bool should_unmap;
RAY_RETURN_NOT_OK(ReadReleaseReply(
buffer.data(), buffer.size(), &released_object_id, &should_unmap));
if (should_unmap) {
auto mmap_entry = mmap_table_.find(fd);
// Release call is idempotent: if we already released, it's ok.
if (mmap_entry != mmap_table_.end()) {
mmap_table_.erase(mmap_entry);
RAY_RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id, may_unmap));
if (may_unmap) {
// Now, since the object release may unmap the mmap, we wait for a reply.
std::vector<uint8_t> buffer;
RAY_RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaReleaseReply, &buffer));
ObjectID released_object_id;

// `should_unmap` is set to true by the plasma server, when the mmap section is
// fallback-allocated and is no longer used.
bool should_unmap;
RAY_RETURN_NOT_OK(ReadReleaseReply(
buffer.data(), buffer.size(), &released_object_id, &should_unmap));
if (should_unmap) {
auto mmap_entry = mmap_table_.find(fd);
// Release call is idempotent: if we already released, it's ok.
if (mmap_entry != mmap_table_.end()) {
mmap_table_.erase(mmap_entry);
}
}
}

auto iter = deletion_cache_.find(object_id);
if (iter != deletion_cache_.end()) {
deletion_cache_.erase(object_id);
Expand Down
1 change: 1 addition & 0 deletions src/ray/object_manager/plasma/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class LocalObject {
object->metadata_size = GetObjectInfo().metadata_size;
object->device_num = GetAllocation().device_num;
object->mmap_size = GetAllocation().mmap_size;
object->fallback_allocated = GetAllocation().fallback_allocated;
}

private:
Expand Down
5 changes: 5 additions & 0 deletions src/ray/object_manager/plasma/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ struct PlasmaObjectSpec {
metadata_offset: ulong;
// The size in bytes of the metadata.
metadata_size: ulong;
// Whether this object is on a fallback allocated mmap.
fallback_allocated: bool;
// Device to create buffer on.
device_num: int;
}
Expand Down Expand Up @@ -231,6 +233,9 @@ table PlasmaGetReply {
table PlasmaReleaseRequest {
// ID of the object to be released.
object_id: string;
// Client concerns this request may unmap a mmap region, and will wait for a reply.
// If false, the server should not send a PlasmaReleaseReply.
may_unmap: bool;
}

table PlasmaReleaseReply {
Expand Down
5 changes: 4 additions & 1 deletion src/ray/object_manager/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ struct PlasmaObject {
int device_num;
/// Set if device_num is equal to 0.
int64_t mmap_size;
/// If the object is fallback_allocated. False means it's on main memory.
bool fallback_allocated;

bool operator==(const PlasmaObject &other) const {
return ((store_fd == other.store_fd) && (data_offset == other.data_offset) &&
(metadata_offset == other.metadata_offset) &&
(data_size == other.data_size) && (metadata_size == other.metadata_size) &&
(device_num == other.device_num));
(device_num == other.device_num) &&
(fallback_allocated == other.fallback_allocated));
}
};

Expand Down
17 changes: 13 additions & 4 deletions src/ray/object_manager/plasma/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ Status SendCreateReply(const std::shared_ptr<Client> &client,
object.data_size,
object.metadata_offset,
object.metadata_size,
object.fallback_allocated,
object.device_num);
auto object_string = fbb.CreateString(object_id.Binary());
fb::PlasmaCreateReplyBuilder crb(fbb);
Expand Down Expand Up @@ -304,6 +305,7 @@ Status ReadCreateReply(uint8_t *data,
object->data_size = message->plasma_object()->data_size();
object->metadata_offset = message->plasma_object()->metadata_offset();
object->metadata_size = message->plasma_object()->metadata_size();
object->fallback_allocated = message->plasma_object()->fallback_allocated();

store_fd->first = INT2FD(message->store_fd());
store_fd->second = message->unique_fd_id();
Expand Down Expand Up @@ -378,18 +380,23 @@ Status ReadSealReply(uint8_t *data, size_t size, ObjectID *object_id) {
// Release messages.

Status SendReleaseRequest(const std::shared_ptr<StoreConn> &store_conn,
ObjectID object_id) {
ObjectID object_id,
bool may_unmap) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
fb::CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.Binary()));
auto message = fb::CreatePlasmaReleaseRequest(
fbb, fbb.CreateString(object_id.Binary()), may_unmap);
return PlasmaSend(store_conn, MessageType::PlasmaReleaseRequest, &fbb, message);
}

Status ReadReleaseRequest(uint8_t *data, size_t size, ObjectID *object_id) {
Status ReadReleaseRequest(uint8_t *data,
size_t size,
ObjectID *object_id,
bool *may_unmap) {
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
*may_unmap = message->may_unmap();
return Status::OK();
}

Expand Down Expand Up @@ -618,6 +625,7 @@ Status SendGetReply(const std::shared_ptr<Client> &client,
object.data_size,
object.metadata_offset,
object.metadata_size,
object.fallback_allocated,
object.device_num));
}
std::vector<int> store_fds_as_int;
Expand Down Expand Up @@ -658,6 +666,7 @@ Status ReadGetReply(uint8_t *data,
plasma_objects[i].data_size = object->data_size();
plasma_objects[i].metadata_offset = object->metadata_offset();
plasma_objects[i].metadata_size = object->metadata_size();
plasma_objects[i].fallback_allocated = object->fallback_allocated();
plasma_objects[i].device_num = object->device_num();
}
RAY_CHECK(message->store_fds()->size() == message->mmap_sizes()->size());
Expand Down
8 changes: 6 additions & 2 deletions src/ray/object_manager/plasma/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,13 @@ Status ReadGetReply(uint8_t *data,
/* Plasma Release message functions. */

Status SendReleaseRequest(const std::shared_ptr<StoreConn> &store_conn,
ObjectID object_id);
ObjectID object_id,
bool may_unmap);

Status ReadReleaseRequest(uint8_t *data, size_t size, ObjectID *object_id);
Status ReadReleaseRequest(uint8_t *data,
size_t size,
ObjectID *object_id,
bool *may_unmap);

Status SendReleaseReply(const std::shared_ptr<Client> &client,
ObjectID object_id,
Expand Down
4 changes: 2 additions & 2 deletions src/ray/object_manager/plasma/shared_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class ClientMmapTableEntry {

~ClientMmapTableEntry();

uint8_t *pointer() { return reinterpret_cast<uint8_t *>(pointer_); }
uint8_t *pointer() const { return reinterpret_cast<uint8_t *>(pointer_); }

MEMFD_TYPE fd() { return fd_; }
MEMFD_TYPE fd() const { return fd_; }

private:
/// The associated file descriptor on the client.
Expand Down
18 changes: 16 additions & 2 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,23 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
ProcessGetRequest(client, object_ids_to_get, timeout_ms, is_from_worker);
} break;
case fb::MessageType::PlasmaReleaseRequest: {
RAY_RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
// May unmap: client knows a fallback-allocated fd is involved.
// Should unmap: server finds refcnt == 0 -> need to be unmapped.
bool may_unmap;
RAY_RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id, &may_unmap));
bool should_unmap = ReleaseObject(object_id, client);
RAY_RETURN_NOT_OK(SendReleaseReply(client, object_id, should_unmap, PlasmaError::OK));
if (!may_unmap) {
RAY_CHECK(!should_unmap)
<< "Plasma client thinks a mmap should not be unmapped but server thinks so. "
"This should not happen because a client knows the object is "
"fallback-allocated in Get/Create time. Object ID: "
<< object_id;
}
if (may_unmap) {
RAY_RETURN_NOT_OK(
SendReleaseReply(client, object_id, should_unmap, PlasmaError::OK));
}

} break;
case fb::MessageType::PlasmaDeleteRequest: {
std::vector<ObjectID> object_ids;
Expand Down

0 comments on commit 8c63f91

Please sign in to comment.