Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Timeline shows Ray not reusing workers. #46658

Open
MaoZiming opened this issue Jul 16, 2024 · 0 comments
Open

[core] Timeline shows Ray not reusing workers. #46658

MaoZiming opened this issue Jul 16, 2024 · 0 comments
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks

Comments

@MaoZiming
Copy link

What happened + What you expected to happen

Based on the Ray timeline, I was expecting there to be fewer workers.
image
Possibly related issue: #36669

Versions / Dependencies

Version: ray-2.32.0

Reproduction script

import os
import time
import numpy as np
import ray

TIME_UNIT = 0.5

def main():

    os.environ["RAY_DATA_OP_RESERVATION_RATIO"] = "0"

    NUM_CPUS = 8
    NUM_GPUS = 4
    NUM_ROWS_PER_TASK = 10
    BUFFER_SIZE_LIMIT = 30
    NUM_TASKS = 10 * 8
    NUM_ROWS_TOTAL = NUM_ROWS_PER_TASK * NUM_TASKS  
    BLOCK_SIZE = 10 * 1024 * 1024 * 10

    def produce(batch):
        time.sleep(TIME_UNIT * 10)
        for id in batch["id"]:
            yield {
                "id": [id],
                "image": [np.zeros(BLOCK_SIZE, dtype=np.uint8)],
            }

    def consume(batch):
        time.sleep(TIME_UNIT)
        return {"id": batch["id"], "result": [0 for _ in batch["id"]]}

    def inference(batch):
        time.sleep(TIME_UNIT)
        return {"id": batch["id"]}

    data_context = ray.data.DataContext.get_current()
    data_context.execution_options.verbose_progress = True
    data_context.target_max_block_size = BLOCK_SIZE
        
    ray.init(num_cpus=NUM_CPUS, num_gpus=NUM_GPUS, object_store_memory=BUFFER_SIZE_LIMIT * BLOCK_SIZE)

    ds = ray.data.range(NUM_ROWS_TOTAL, override_num_blocks=NUM_TASKS)
    ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK)
    ds = ds.map_batches(consume, batch_size=1, num_cpus=0.99)
    ds = ds.map_batches(inference, batch_size=1, num_gpus=1) 

    start_time = time.time()
    for i, _ in enumerate(ds.iter_batches(batch_size=NUM_ROWS_PER_TASK)):
        pass
    end_time = time.time()
    print(ds.stats())
    print(ray._private.internal_api.memory_summary(stats_only=True))
    print(f"Total time: {end_time - start_time:.4f}s")
    ray.timeline("test.json")
    ray.shutdown()

if __name__ == "__main__": 
    main()

Issue Severity

Medium: It is a significant difficulty but I can work around it.

@MaoZiming MaoZiming added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jul 16, 2024
@anyscalesam anyscalesam added the core Issues that should be addressed in Ray Core label Jul 24, 2024
@jjyao jjyao added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jul 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

No branches or pull requests

3 participants