Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions python/ray/_private/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ def store_stats_summary(reply):
),
)
)
if reply.store_stats.consumed_bytes > 0:
store_summary += "Objects consumed by Ray tasks: {} MiB.\n".format(
int(reply.store_stats.consumed_bytes / (1024 * 1024))
)
if reply.store_stats.object_pulls_queued:
store_summary += "Object fetches queued, waiting for available memory."

Expand Down
49 changes: 4 additions & 45 deletions python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
)
from ray._private.internal_api import memory_summary
from ray._common.test_utils import wait_for_condition
from ray._raylet import GcsClientOptions
import ray.remote_function
from ray.tests.conftest import (
buffer_object_spilling_config,
Expand Down Expand Up @@ -65,26 +64,6 @@ def is_dir_empty(temp_folder, node_id, append_path=True):
return num_files == 0


def assert_no_thrashing(address):
state = ray._private.state.GlobalState()
options = GcsClientOptions.create(
address, None, allow_cluster_id_nil=True, fetch_cluster_id_if_nil=False
)
state._initialize_global_state(options)
summary = memory_summary(address=address, stats_only=True)
restored_bytes = 0
consumed_bytes = 0

for line in summary.split("\n"):
if "Restored" in line:
restored_bytes = int(line.split(" ")[1])
if "consumed" in line:
consumed_bytes = int(line.split(" ")[-2])
assert (
consumed_bytes >= restored_bytes
), f"consumed: {consumed_bytes}, restored: {restored_bytes}"


@pytest.mark.skipif(platform.system() == "Windows", reason="Doesn't support Windows.")
def test_spill_file_uniqueness(shutdown_only):
ray_context = ray.init(num_cpus=0, object_store_memory=75 * 1024 * 1024)
Expand Down Expand Up @@ -462,7 +441,6 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config, shutdown_on

print(type(temp_folder))
wait_for_condition(lambda: is_dir_empty(temp_folder, ray_context["node_id"]))
assert_no_thrashing(ray_context["address"])


def test_spill_remote_object(
Expand Down Expand Up @@ -509,14 +487,13 @@ def depends(arg):

# Test passing the spilled object as an arg to another task.
ray.get(depends.remote(ref))
assert_no_thrashing(cluster.address)


@pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows.")
def test_spill_objects_automatically(fs_only_object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = fs_only_object_spilling_config
address = ray.init(
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
Expand Down Expand Up @@ -548,7 +525,6 @@ def test_spill_objects_automatically(fs_only_object_spilling_config, shutdown_on
solution = solution_buffer[index]
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, solution)
assert_no_thrashing(address["address"])


@pytest.mark.skipif(
Expand All @@ -558,7 +534,7 @@ def test_spill_objects_automatically(fs_only_object_spilling_config, shutdown_on
def test_unstable_spill_objects_automatically(unstable_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = unstable_spilling_config
address = ray.init(
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
Expand Down Expand Up @@ -588,13 +564,12 @@ def test_unstable_spill_objects_automatically(unstable_spilling_config, shutdown
solution = solution_buffer[index]
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, solution)
assert_no_thrashing(address["address"])


def test_slow_spill_objects_automatically(slow_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = slow_spilling_config
address = ray.init(
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
Expand Down Expand Up @@ -626,7 +601,6 @@ def test_slow_spill_objects_automatically(slow_spilling_config, shutdown_only):
solution = solution_buffer[index]
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, solution)
assert_no_thrashing(address["address"])


def test_spill_stats(object_spilling_config, shutdown_only):
Expand Down Expand Up @@ -662,27 +636,13 @@ def f():
assert "Spilled 200 MiB, 4 objects" in s, s
assert "Restored 150 MiB, 3 objects" in s, s

# Test if consumed bytes are correctly calculated.
obj = ray.put(np.zeros(30 * 1024 * 1024, dtype=np.uint8))

@ray.remote
def func_with_ref(obj):
return True

ray.get(func_with_ref.remote(obj))

s = memory_summary(address=address["address"], stats_only=True)
# 50MB * 5 references + 30MB used for task execution.
assert "Objects consumed by Ray tasks: 280 MiB." in s, s
assert_no_thrashing(address["address"])


@pytest.mark.skipif(platform.system() == "Darwin", reason="Failing on macOS.")
@pytest.mark.asyncio
@pytest.mark.parametrize("is_async", [False, True])
async def test_spill_during_get(object_spilling_config, shutdown_only, is_async):
object_spilling_config, _ = object_spilling_config
address = ray.init(
ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_system_config={
Expand Down Expand Up @@ -735,7 +695,6 @@ def f():
assert duration <= timedelta(
seconds=timeout_seconds
), "Concurrent gets took too long. Maybe IO workers are not started properly." # noqa: E501
assert_no_thrashing(address["address"])


@pytest.mark.parametrize(
Expand Down
13 changes: 3 additions & 10 deletions python/ray/tests/test_object_spilling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import ray
from ray._common.test_utils import wait_for_condition
from ray._private.test_utils import run_string_as_driver
from ray.tests.test_object_spilling import assert_no_thrashing, is_dir_empty
from ray.tests.test_object_spilling import is_dir_empty
from ray._private.external_storage import (
FileSystemStorage,
)
Expand Down Expand Up @@ -54,7 +54,6 @@ def test_delete_objects(object_spilling_config, shutdown_only):
lambda: is_dir_empty(temp_folder, ray_context["node_id"]),
timeout=condition_wait_timeout,
)
assert_no_thrashing(ray_context["address"])


def test_delete_objects_delete_while_creating(object_spilling_config, shutdown_only):
Expand Down Expand Up @@ -96,7 +95,6 @@ def test_delete_objects_delete_while_creating(object_spilling_config, shutdown_o
lambda: is_dir_empty(temp_folder, ray_context["node_id"]),
timeout=condition_wait_timeout,
)
assert_no_thrashing(ray_context["address"])


@pytest.mark.skipif(platform.system() in ["Windows"], reason="Failing on Windows.")
Expand Down Expand Up @@ -160,7 +158,6 @@ def wait_until_actor_dead():
lambda: is_dir_empty(temp_folder, ray_context["node_id"]),
timeout=condition_wait_timeout,
)
assert_no_thrashing(ray_context["address"])


@pytest.mark.skipif(platform.system() in ["Windows"], reason="Failing on Windows.")
Expand Down Expand Up @@ -274,14 +271,13 @@ def wait_until_actor_dead(actor):
lambda: is_dir_empty(temp_folder, worker_node2.node_id),
timeout=condition_wait_timeout,
)
assert_no_thrashing(cluster.address)


def test_fusion_objects(fs_only_object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = fs_only_object_spilling_config
min_spilling_size = 10 * 1024 * 1024
address = ray.init(
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 3,
Expand Down Expand Up @@ -327,13 +323,12 @@ def test_fusion_objects(fs_only_object_spilling_config, shutdown_only):
if file_size >= min_spilling_size:
is_test_passing = True
assert is_test_passing
assert_no_thrashing(address["address"])


# https://github.com/ray-project/ray/issues/12912
def test_release_resource(object_spilling_config, shutdown_only):
object_spilling_config, temp_folder = object_spilling_config
address = ray.init(
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
Expand Down Expand Up @@ -362,7 +357,6 @@ def f(dep):
canary = sneaky_task_tries_to_steal_released_resources.remote()
ready, _ = ray.wait([canary], timeout=2)
assert not ready
assert_no_thrashing(address["address"])


def test_spill_objects_on_object_transfer(
Expand Down Expand Up @@ -419,7 +413,6 @@ def allocate(*args):
# spilling.
tasks = [foo.remote(*task_args) for task_args in args]
ray.get(tasks)
assert_no_thrashing(cluster.address)


@pytest.mark.skipif(
Expand Down
6 changes: 2 additions & 4 deletions python/ray/tests/test_object_spilling_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import ray
from ray._common.test_utils import wait_for_condition
from ray.cluster_utils import Cluster, cluster_not_supported
from ray.tests.test_object_spilling import assert_no_thrashing, is_dir_empty
from ray.tests.test_object_spilling import is_dir_empty

# Note: Disk write speed can be as low as 6 MiB/s in AWS Mac instances, so we have to
# increase the timeout.
Expand Down Expand Up @@ -82,7 +82,6 @@ def test_multiple_directories(tmp_path, shutdown_only):
for temp_dir in temp_dirs:
temp_folder = temp_dir
wait_for_condition(lambda: is_dir_empty(temp_folder, ray_context["node_id"]))
assert_no_thrashing(ray_context["address"])

# Now kill ray and see all directories are deleted.
print("Check directories are deleted...")
Expand Down Expand Up @@ -296,7 +295,7 @@ def run_workload():
def test_spill_deadlock(object_spilling_config, shutdown_only):
object_spilling_config, _ = object_spilling_config
# Limit our object store to 75 MiB of memory.
address = ray.init(
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
Expand All @@ -321,7 +320,6 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=None)
assert np.array_equal(sample, arr)
assert_no_thrashing(address["address"])


def test_spill_reconstruction_errors(ray_start_cluster, object_spilling_config):
Expand Down
3 changes: 1 addition & 2 deletions src/mock/ray/object_manager/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class MockPlasmaClient : public PlasmaClientInterface {
Get,
(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers,
bool is_from_worker),
std::vector<ObjectBuffer> *object_buffers),
(override));

MOCK_METHOD(Status,
Expand Down
9 changes: 2 additions & 7 deletions src/ray/core_worker/store_provider/plasma_store_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ Status CoreWorkerPlasmaStoreProvider::PullObjectsAndGetFromPlasmaStore(
RAY_RETURN_NOT_OK(raylet_client_->AsyncGetObjects(batch_ids, owner_addresses));

std::vector<plasma::ObjectBuffer> plasma_results;
RAY_RETURN_NOT_OK(store_client_->Get(batch_ids,
timeout_ms,
&plasma_results,
/*is_from_worker=*/true));
RAY_RETURN_NOT_OK(store_client_->Get(batch_ids, timeout_ms, &plasma_results));

// Add successfully retrieved objects to the result map and remove them from
// the set of IDs to get.
Expand Down Expand Up @@ -225,11 +222,9 @@ Status CoreWorkerPlasmaStoreProvider::GetIfLocal(
const std::vector<ObjectID> &object_ids,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results) {
std::vector<plasma::ObjectBuffer> plasma_results;
// Since this path is used only for spilling, we should set is_from_worker: false.
RAY_RETURN_NOT_OK(store_client_->Get(object_ids,
/*timeout_ms=*/0,
&plasma_results,
/*is_from_worker=*/false));
&plasma_results));

for (size_t i = 0; i < object_ids.size(); i++) {
if (plasma_results[i].data != nullptr || plasma_results[i].metadata != nullptr) {
Expand Down
3 changes: 1 addition & 2 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ ObjectBufferPool::CreateObjectReader(const ObjectID &object_id,

std::vector<ObjectID> object_ids{object_id};
std::vector<plasma::ObjectBuffer> object_buffers(1);
RAY_CHECK_OK(
store_client_->Get(object_ids, 0, &object_buffers, /*is_from_worker=*/false));
RAY_CHECK_OK(store_client_->Get(object_ids, 0, &object_buffers));
if (object_buffers[0].data == nullptr) {
RAY_LOG(INFO)
<< "Failed to get a chunk of the object: " << object_id
Expand Down
1 change: 0 additions & 1 deletion src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,6 @@ void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const {
plasma::plasma_store_runner->GetFallbackAllocated());
stats->set_object_store_bytes_avail(config_.object_store_memory);
stats->set_num_local_objects(local_objects_.size());
stats->set_consumed_bytes(plasma::plasma_store_runner->GetConsumedBytes());
stats->set_cumulative_created_objects(
plasma::plasma_store_runner->GetCumulativeCreatedObjects());
stats->set_cumulative_created_bytes(
Expand Down
29 changes: 10 additions & 19 deletions src/ray/object_manager/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,12 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp

Status Get(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers,
bool is_from_worker);
std::vector<ObjectBuffer> *object_buffers);

Status Get(const ObjectID *object_ids,
int64_t num_objects,
int64_t timeout_ms,
ObjectBuffer *object_buffers,
bool is_from_worker);
ObjectBuffer *object_buffers);

Status GetExperimentalMutableObject(const ObjectID &object_id,
std::unique_ptr<MutableObject> *mutable_object);
Expand Down Expand Up @@ -187,8 +185,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status GetBuffers(const ObjectID *object_ids,
int64_t num_objects,
int64_t timeout_ms,
ObjectBuffer *object_buffers,
bool is_from_worker);
ObjectBuffer *object_buffers);

uint8_t *LookupMmappedFile(MEMFD_TYPE store_fd_val) const;

Expand Down Expand Up @@ -463,8 +460,7 @@ Status PlasmaClient::Impl::TryCreateImmediately(const ObjectID &object_id,
Status PlasmaClient::Impl::GetBuffers(const ObjectID *object_ids,
int64_t num_objects,
int64_t timeout_ms,
ObjectBuffer *object_buffers,
bool is_from_worker) {
ObjectBuffer *object_buffers) {
// Fill out the info for the objects that are already in use locally.
bool all_present = true;
for (int64_t i = 0; i < num_objects; ++i) {
Expand Down Expand Up @@ -517,8 +513,7 @@ Status PlasmaClient::Impl::GetBuffers(const ObjectID *object_ids,
for (int64_t i = 0; i < num_objects; i++) {
RAY_LOG(DEBUG) << "Sending get request " << object_ids[i];
}
RAY_RETURN_NOT_OK(SendGetRequest(
store_conn_, &object_ids[0], num_objects, timeout_ms, is_from_worker));
RAY_RETURN_NOT_OK(SendGetRequest(store_conn_, &object_ids[0], num_objects, timeout_ms));
std::vector<uint8_t> buffer;
RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaGetReply, &buffer));
std::vector<ObjectID> received_object_ids(num_objects);
Expand Down Expand Up @@ -630,13 +625,11 @@ Status PlasmaClient::Impl::GetExperimentalMutableObject(

Status PlasmaClient::Impl::Get(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms,
std::vector<ObjectBuffer> *out,
bool is_from_worker) {
std::vector<ObjectBuffer> *out) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
const size_t num_objects = object_ids.size();
*out = std::vector<ObjectBuffer>(num_objects);
return GetBuffers(
object_ids.data(), num_objects, timeout_ms, out->data(), is_from_worker);
return GetBuffers(object_ids.data(), num_objects, timeout_ms, out->data());
}

Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID &object_id) {
Expand Down Expand Up @@ -922,18 +915,16 @@ Status PlasmaClient::TryCreateImmediately(const ObjectID &object_id,

Status PlasmaClient::Get(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers,
bool is_from_worker) {
return impl_->Get(object_ids, timeout_ms, object_buffers, is_from_worker);
std::vector<ObjectBuffer> *object_buffers) {
return impl_->Get(object_ids, timeout_ms, object_buffers);
}

Status PlasmaClient::GetExperimentalMutableObject(
const ObjectID &object_id, std::unique_ptr<MutableObject> *mutable_object) {
// First make sure the object is in scope. The ObjectBuffer will keep the
// value pinned in the plasma store.
std::vector<ObjectBuffer> object_buffers;
RAY_RETURN_NOT_OK(impl_->Get(
{object_id}, /*timeout_ms=*/0, &object_buffers, /*is_from_worker=*/true));
RAY_RETURN_NOT_OK(impl_->Get({object_id}, /*timeout_ms=*/0, &object_buffers));
if (!object_buffers[0].data) {
return Status::Invalid(
"Experimental mutable object must be in the local object store to register as "
Expand Down
Loading