Skip to content

Commit

Permalink
[object_buffer_pool] reduce mutex lock scope in WriteChunk (ray-proje…
Browse files Browse the repository at this point in the history
…ct#43434)

Signed-off-by: Saurabh Vishwas Joshi <sjoshi@pinterest.com>

## Why are these changes needed?
Object store network transfer performance is slow, and we observe a periodic burst followed by a gap in the network usage. 

A burst of inbound network traffic occurs at the beginning of each `ray.get(obj_refs)` call, then there is a wide-gap of unused network traffic and then a subsequent burst of network traffic in the next `ray.get(obj_refs) call`.   

This looks like a processing bottleneck when payloads are received over the network on the Pull side. 

We dug into the code in [object_manager](https://github.com/ray-project/ray/tree/91d5af69085897b02d29bc0d15a53849e56eb8e4/src/ray/object_manager) and found the following:

- Objects are transferred in chunks of size: 5 MiB
- When a PushRequest is received for a chunk, it is processed by [ObjectManager::HandlePush](https://github.com/ray-project/ray/blob/7ff3969159d3aeac00415ac26bf96a63f782db86/src/ray/object_manager/object_manager.cc#L562)
- Which internally calls the [ObjectManager::ReceiveObjectChunk](https://github.com/ray-project/ray/blob/7ff3969159d3aeac00415ac26bf96a63f782db86/src/ray/object_manager/object_manager.cc#L623). This results in a call to the function [ObjectBufferPool::WriteChunk](https://github.com/ray-project/ray/blob/91d5af69085897b02d29bc0d15a53849e56eb8e4/src/ray/object_manager/object_buffer_pool.h#L128). 
- The WriteChunk function is [mutex guarded](https://github.com/ray-project/ray/blob/91d5af69085897b02d29bc0d15a53849e56eb8e4/src/ray/object_manager/object_buffer_pool.cc#L122) throughout its execution.  
- This includes the [std::memcpy](https://github.com/ray-project/ray/blob/91d5af69085897b02d29bc0d15a53849e56eb8e4/src/ray/object_manager/object_buffer_pool.cc#L139) call for a 5MiB payload
- This [pool_mutex_](https://github.com/ray-project/ray/blob/91d5af69085897b02d29bc0d15a53849e56eb8e4/src/ray/object_manager/object_buffer_pool.h#L215) lock is shared by all object_ids being received over the network 

Which makes us believe that even if chunks for different ObjectId are received in parallel over the network they are written sequentially. Which would explain why we see a burst in the network usage followed by a hole in the network usage

### Changes

**Write Chunk**
- Transition the chunk from `REFERENCED` to `SEALED` before releasing the lock
- Increment / Decrement `num_inflight_copies` before / after the copy
- Perform an unguarded memcpy of the chunk into the buffer
- Reacquire the mutex lock and perform object_id level `Seal` and `Release` decisions

**AbortCreate**
- Wait to ensure `num_inflight_copies == 0` before allowing the `Release` & `Abort` calls for the `object_id`
- This check ensures that we do not release the underlying buffer while an unguarded copy is ongoing


### Tests

#### Before

- Sampled network at 1 second frequency

`speedometer.py -i 1 -m 64424509440 -n 1073741824 -rx eth0`
<img width="590" alt="Screenshot 2024-02-13 at 10 26 33 AM" src="https://github.com/ray-project/ray/assets/8691593/7a5497dd-b87d-4bec-a51c-62d629c06c58">

- Sampled network at 100 millisecond frequency

`speedometer.py -i 0.1 -m 64424509440 -n 1073741824 -rx eth0 `
<img width="656" alt="Screenshot 2024-02-13 at 10 26 37 AM" src="https://github.com/ray-project/ray/assets/8691593/5ac229e6-4777-4820-b9e7-ebbd63cafcc9">


#### After

- Sampled network at 1 second frequency
`speedometer.py -i 1 -m 64424509440 -n 1073741824 -rx eth0`
<img width="620" alt="Screenshot 2024-02-21 at 1 22 08 PM" src="https://github.com/ray-project/ray/assets/8691593/f15196d6-01f7-4d3e-b56c-ba09c58be455">


- Sampled network at 100ms frequency
`speedometer.py -i 0.1 -m 64424509440 -n 1073741824 -rx eth0` 
<img width="1674" alt="Screenshot 2024-02-21 at 1 19 10 PM" src="https://github.com/ray-project/ray/assets/8691593/584f64b9-a39f-4441-b034-86234aff4283">

```
Finished benchmark for total_size_MiB: 102400, block_size_MiB: 1024, parallel_block: None
	ray.wait(fetch_local=True) Gbps: 54.67178658153866
	ray.wait(fetch_local=True) total time s: 15.711823463439941
	ray.get() Gbps: 186301.24111362518
	ray.get() total time s: 0.004610776901245117
```

## Related issue number
  • Loading branch information
sjoshi6 authored and iamyangchen committed Mar 25, 2024
1 parent b8d0cb3 commit 7692d63
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 25 deletions.
85 changes: 60 additions & 25 deletions src/ray/object_manager/object_buffer_pool.cc
Expand Up @@ -14,6 +14,8 @@

#include "ray/object_manager/object_buffer_pool.h"

#include <optional>

#include "absl/time/time.h"
#include "ray/common/status.h"
#include "ray/util/logging.h"
Expand Down Expand Up @@ -119,32 +121,57 @@ void ObjectBufferPool::WriteChunk(const ObjectID &object_id,
uint64_t metadata_size,
const uint64_t chunk_index,
const std::string &data) {
absl::MutexLock lock(&pool_mutex_);
auto it = create_buffer_state_.find(object_id);
if (it == create_buffer_state_.end() || chunk_index >= it->second.chunk_state.size() ||
it->second.chunk_state.at(chunk_index) != CreateChunkState::REFERENCED) {
RAY_LOG(DEBUG) << "Object " << object_id << " aborted before chunk " << chunk_index
<< " could be sealed";
return;
}
if (it->second.data_size != data_size || it->second.metadata_size != metadata_size) {
RAY_LOG(DEBUG) << "Object " << object_id << " size mismatch, rejecting chunk";
return;
std::optional<ObjectBufferPool::ChunkInfo> chunk_info;
{
absl::MutexLock lock(&pool_mutex_);
auto it = create_buffer_state_.find(object_id);
if (it == create_buffer_state_.end() ||
chunk_index >= it->second.chunk_state.size() ||
it->second.chunk_state.at(chunk_index) != CreateChunkState::REFERENCED) {
RAY_LOG(DEBUG) << "Object " << object_id << " aborted before chunk " << chunk_index
<< " could be sealed";
return;
}
if (it->second.data_size != data_size || it->second.metadata_size != metadata_size) {
RAY_LOG(DEBUG) << "Object " << object_id << " size mismatch, rejecting chunk";
return;
}
RAY_CHECK(it->second.chunk_info.size() > chunk_index);

chunk_info = it->second.chunk_info.at(chunk_index);
RAY_CHECK(data.size() == chunk_info->buffer_length)
<< "size mismatch! data size: " << data.size()
<< " chunk size: " << chunk_info->buffer_length;

// Update the state from REFERENCED To SEALED before releasing the lock to ensure
// that no other thread sees a REFERENCED state.
it->second.chunk_state.at(chunk_index) = CreateChunkState::SEALED;
// Increment the number of inflight copies to ensure Abort
// does not release the buffer.
it->second.num_inflight_copies++;
}
RAY_CHECK(it->second.chunk_info.size() > chunk_index);
auto &chunk_info = it->second.chunk_info.at(chunk_index);
RAY_CHECK(data.size() == chunk_info.buffer_length)
<< "size mismatch! data size: " << data.size()
<< " chunk size: " << chunk_info.buffer_length;
std::memcpy(chunk_info.data, data.data(), chunk_info.buffer_length);
it->second.chunk_state.at(chunk_index) = CreateChunkState::SEALED;
it->second.num_seals_remaining--;
if (it->second.num_seals_remaining == 0) {
RAY_CHECK_OK(store_client_->Seal(object_id));
RAY_CHECK_OK(store_client_->Release(object_id));
create_buffer_state_.erase(it);
RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id
<< ", last chunk index: " << chunk_index;

RAY_CHECK(chunk_info.has_value()) << "chunk_info is not set";
// The num_inflight_copies is used to ensure that another thread cannot call Release
// on the object_id, which makes the unguarded copy call safe.
std::memcpy(chunk_info->data, data.data(), chunk_info->buffer_length);

{
// Ensure the process of object_id Seal and Release is mutex guarded.
absl::MutexLock lock(&pool_mutex_);
auto it = create_buffer_state_.find(object_id);
// Abort cannot be called during inflight copy operations.
RAY_CHECK(it != create_buffer_state_.end());
// Decrement the number of inflight copies to ensure Abort can release the buffer.
it->second.num_inflight_copies--;
it->second.num_seals_remaining--;
if (it->second.num_seals_remaining == 0) {
RAY_CHECK_OK(store_client_->Seal(object_id));
RAY_CHECK_OK(store_client_->Release(object_id));
create_buffer_state_.erase(it);
RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id
<< ", last chunk index: " << chunk_index;
}
}
}

Expand All @@ -154,6 +181,14 @@ void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
}

void ObjectBufferPool::AbortCreateInternal(const ObjectID &object_id) {
auto no_copy_inflight = [this, object_id]() {
pool_mutex_.AssertReaderHeld();
auto it = create_buffer_state_.find(object_id);
return it == create_buffer_state_.end() || it->second.num_inflight_copies == 0;
};

pool_mutex_.Await(absl::Condition(&no_copy_inflight));
// Mutex is acquired, no copy inflight, safe to abort the object_id.
auto it = create_buffer_state_.find(object_id);
if (it != create_buffer_state_.end()) {
RAY_CHECK_OK(store_client_->Release(object_id));
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/object_buffer_pool.h
Expand Up @@ -195,6 +195,8 @@ class ObjectBufferPool {
std::vector<CreateChunkState> chunk_state;
/// The number of chunks left to seal before the buffer is sealed.
uint64_t num_seals_remaining;
/// The number of inflight copy operations.
uint64_t num_inflight_copies = 0;
};

/// Returned when GetChunk or CreateChunk fails.
Expand Down

0 comments on commit 7692d63

Please sign in to comment.