Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Streaming executor logging verbosity not configurable? #42191

Closed
bdewilde opened this issue Jan 5, 2024 · 5 comments
Closed

[Data] Streaming executor logging verbosity not configurable? #42191

bdewilde opened this issue Jan 5, 2024 · 5 comments
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P2 Important issue, but not time-critical

Comments

@bdewilde
Copy link

bdewilde commented Jan 5, 2024

What happened + What you expected to happen

Any time I perform a set of operations on a ray.data.Dataset, I see at least three info-level logging messages from streaming_executor.py:

  • the first describes the DAG it is executing
  • the next describes the execution config, which is typically the same every time
  • the last offers a "tip" for enabling even more verbose logging, which is the same every time

I would like fewer logging messages from the streaming executor. I've tried several different approaches without success:

  • increasing the logging level for the "ray" and/or "ray.data" loggers from where I'm invoking the dataset operations
  • adding a logging filter to the above loggers, specifically targeting the aforementioned "tip" message
  • doing the above two steps but wrapped in a function that I pass into ray via ray.init(runtime_env={"worker_process_setup_hook": shut_up_streaming_executor_func})
  • a couple other wilder swings that I didn't work and aren't worth recounting here

I expect that this logging should be configurable, I just don't know how to do it. Yes, I've read through this docs page.

Versions / Dependencies

macOS 14.1
PY 3.10
ray 2.9.0

Reproduction script

import ray.data
ray.data.range(10).map_batches(lambda df: df, batch_format="pandas").materialize()

outputs

2024-01-04 21:22:28,440	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=20 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (10).
2024-01-04 21:22:28,440	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 20, each read task output is split into 2 smaller blocks.
2024-01-04 21:22:28,441	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2024-01-04 21:22:28,441	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-04 21:22:28,442	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`

Issue Severity

Low: It annoys or frustrates me.

@bdewilde bdewilde added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jan 5, 2024
@anyscalesam anyscalesam added the data Ray Data-related issues label Jan 8, 2024
@scottjlee scottjlee added P2 Important issue, but not time-critical and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jan 8, 2024
@scottjlee
Copy link
Contributor

Currently we do not have a way to configure file/module specific logging (without modifying the underlying classes/files). We could expose this as a parameter from DataContext specifically for streaming executor logs (e.g. can generalize the enable_auto_log_stats parameter already in the class to gate logging to stdout for streaming execution in general). @raulchen thoughts on this? https://github.com/ray-project/ray/blob/master/python/ray/data/context.py#L299

@Nintorac
Copy link
Contributor

Nintorac commented Jan 20, 2024

I think it would go a long way to solving this by just printing this information once, rather than on every batch. I'm using this with torch_iter_batches if that makes a difference

I've tried these to no avail

logging.getLogger("ray").setLevel(logging.ERROR)
logging.getLogger("ray.data").setLevel(logging.ERROR)
logging.getLogger("ray.data._internal.execution.streaming_executor").setLevel(logging.ERROR)

and my current workaround is to delete the offending lines from the modules aha

here is couple seconds worth of output
->MapBatches(f)]
2024-01-20 15:30:26,796 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:26,796 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
1it [00:05,  5.89s2024-01-20 15:30:32,687       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:32,687 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:32,687 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:32,687 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2it [00:06,  2.61s2024-01-20 15:30:33,006       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:33,007 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:33,007 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:33,007 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
3it [00:06,  1.57s2024-01-20 15:30:33,342       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:33,343 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:33,343 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:33,343 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
4it [00:06,  1.09s2024-01-20 15:30:33,694       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:33,694 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:33,694 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:33,694 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
5it [00:07,  1.21i2024-01-20 15:30:34,041       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:34,041 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:34,042 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:34,042 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
6it [00:07,  1.52i2024-01-20 15:30:34,384       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:34,384 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:34,384 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:34,385 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
7it [00:07,  1.79i2024-01-20 15:30:34,737       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:34,737 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:34,737 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:34,737 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
8it [00:08,  2.04i2024-01-20 15:30:35,080       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:35,081 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:35,081 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:35,081 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
9it [00:08,  2.27i2024-01-20 15:30:35,408       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:35,408 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:35,408 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:35,408 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
10it [00:08,  2.442024-01-20 15:30:35,750       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:35,750 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:35,750 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:35,750 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
11it [00:09,  2.512024-01-20 15:30:36,124       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:36,124 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:36,124 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:36,124 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
12it [00:09,  2.632024-01-20 15:30:36,459       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:36,460 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:36,460 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:36,460 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
13it [00:10,  2.712024-01-20 15:30:36,802       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:36,803 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:36,803 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:36,803 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
14it [00:10,  2.812024-01-20 15:30:37,129       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:37,129 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:37,130 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:37,131 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
15it [00:10,  2.812024-01-20 15:30:37,484       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:37,485 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:37,485 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:37,485 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
16it [00:11,  2.802024-01-20 15:30:37,845       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:37,846 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:37,846 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:37,846 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
17it [00:11,  2.842024-01-20 15:30:38,183       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:38,183 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:38,183 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:38,183 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
18it [00:11,  2.822024-01-20 15:30:38,545       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:38,546 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:38,546 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-20 15:30:38,546 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
19it [00:12,  2.882024-01-20 15:30:38,875       INFO set_read_parallelism.py:115 -- Using autodetected parallelism=32 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (16).
2024-01-20 15:30:38,876 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(f)]
2024-01-20 15:30:38,876 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)

@slnc
Copy link

slnc commented Mar 5, 2024

My team uses Ray with Jupyter notebooks and this issue is a big problem in this use case.

The official documentation says that you can configure the logging level of different ray components, but that doesn't seem to work. ray.init(..., logging_level=logging.ERROR) doesn't work either.

image

We use devcontainers so workarounds like monkey-patching or deleting ray logging lines aren't great.

@murthyn
Copy link

murthyn commented Mar 5, 2024

Also running into this issue!

@scottjlee
Copy link
Contributor

#43360 and #43735 cleans up a lot of the Ray Data logging to stdout. Full logs will still be written to the Ray Data log file.

You can try out the latest nightly, or wait for the upcoming Ray 2.10 release which will include this change. Please feel free to re-open this issue for further followups questions or requests!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P2 Important issue, but not time-critical
Projects
None yet
Development

No branches or pull requests

6 participants