Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Optimize dask.bag for file sizes + add dask.bag options to config #580

Closed
kalleknast opened this issue Oct 11, 2022 · 18 comments · Fixed by #611
Closed

ENH: Optimize dask.bag for file sizes + add dask.bag options to config #580

kalleknast opened this issue Oct 11, 2022 · 18 comments · Fixed by #611
Labels
ENH: enhancement enhancement; new feature or request

Comments

@kalleknast
Copy link

Problem
I'm prepping a big dataset (15 GB of WAV). When computing the spectrograms I run out of memory with the error: concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Suggestion
This can be fixed by limiting the number of partitions used by dask.bag to 20. I.e. changing line 229 in audio.py from bag = db.from_sequence(audio_files) to bag = db.from_sequence(audio_files, npartitions=20) fixes the problem (setting partition_size would probably also work). The best would be to automatically figure out the number of workers and memory available and set npartitions/partition_size accordingly. Unfortunately, I cannot see a way to do that without additional dependencies.
An alternative would be to add the number of workers to the [PREP] section in config.toml.

@kalleknast kalleknast added the ENH: enhancement enhancement; new feature or request label Oct 11, 2022
@NickleDave
Copy link
Collaborator

NickleDave commented Oct 11, 2022

Hi @kalleknast, sorry you're running into this issue. Thank you for taking the time to provide a detailed suggestion on how to deal with it.

An alternative would be to add the number of workers to the [PREP] section in config.toml.

This seems like the easiest thing to do that would have the least impact on anyone else. I would be open to adding this option to [PREP]. You're imagining something like this?

[PREP]
...
npartitions = 20

A more standard .toml config (as described in #345) could have a vak.prep.dask table with npartitions and partition_size options, but just adding an npartitions would be a good-enough solution for now.

This can be fixed by limiting the number of partitions used by dask.bag to 20. I.e. changing line 229 in audio.py from bag = db.from_sequence(audio_files) to bag = db.from_sequence(audio_files, npartitions=20) fixes the problem (setting partition_size would probably also work)

This workaround let you finish preping the dataset?
I am hesitant to set a lower number like 20 as a default since dask defaults to "about 100", see https://docs.dask.org/en/stable/bag-creation.html?highlight=npartitions#db-from-sequence. It's not clear to me if lowering this number to 20 would slow down parallel processing for people that might have data with different characteristics (e.g. smaller audio files).

The best would be to automatically figure out the number of workers and memory available and set npartitions/partition_size accordingly. Unfortunately, I cannot see a way to do that without additional dependencies.

Could you point me to what you are looking at for automatic optimization of npartitions? If it's just something like depending on dask[all] instead of dask[bag] I could be fine with that but need to think it over.

Thanks again, happy to help make this easier for you. I can add the npartitions option myself or accept a pull request if you are interested.

@kalleknast
Copy link
Author

kalleknast commented Oct 11, 2022

Dask uses the multiprocessing package (i.e. no extra dependencies). We can get the number of cups from that: multiprocessing.cpu_count().

In audio.py:
adding to line 4:

import multiprocessing

and changing lines 229-230 to:

    npartitions = multiprocessing.cpu_count() - 1
    bag = db.from_sequence(audio_files, npartitions=npartitions)

Preparing 14.7 GB of WAVs to 83.2 GB of spectrograms took 11m 46s.

@kalleknast
Copy link
Author

I tried by setting partition_size to 3% of the available memory too. Increasing the partition_size beyond 3% of available memory results in out of memory errors.

import psutil
partition_size = int(.03*psutil.virtual_memory().available)
bag = db.from_sequence(audio_files, partition_size=partition_size)

However, the computation took way longer 32m 20s instead of 11m 46s.

@NickleDave
Copy link
Collaborator

Thank you @kalleknast this is helpful.

I am working on understanding what's going on here a little better.

Can I ask you to do a couple things so I can help you?

  • replicate your original error and then paste the full traceback here so I can get a better sense of what's going on when it crashes? E.g., formatted with triple backticks so it's searchable and reads as console output at the same time
  • give me some measure of the size of your individual audio files and how many files there are. The times and total sizes are very helpful but it would also be really good to know if there's some difference in individual file size that makes it necessary to change npartitions or partition_size

@NickleDave
Copy link
Collaborator

Also can I ask what operating system you are on?

I'm wondering if it's Windows, which in the past is the platform where I have run into more dask errors. I searched dask issues and this is the only one where I found the same error message, and it's Windows related:
dask/dask#8506

Nothing against Windows--I would love the code to work there--I'm just trying to get to the root of the error

@kalleknast
Copy link
Author

System: Ubuntu 20.04.5 LTS with 62.7 GM memory

Files: 107 wav files sampled at 96kHz with sizes between 13.4 and 212.6 MB (most around 130 MB)

Traceback:

$ vak prep train_config.toml 
2022-10-14 22:59:43,745 - vak.cli.prep - INFO - Determined that purpose of config file is: train.
Will add 'csv_path' option to 'TRAIN' section.
2022-10-14 22:59:43,745 - vak.core.prep - INFO - purpose for dataset: train
2022-10-14 22:59:43,745 - vak.core.prep - INFO - will split dataset
2022-10-14 22:59:44,315 - vak.io.dataframe - INFO - making array files containing spectrograms from audio files in: data/WAV
2022-10-14 22:59:44,319 - vak.io.audio - INFO - creating array files with spectrograms
[                                        ] | 0% Completed | 12.69 sms
Traceback (most recent call last):
  File "/home/hjalmar/callclass/bin/vak", line 8, in <module>
    sys.exit(main())
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/__main__.py", line 45, in main
    cli.cli(command=args.command, config_file=args.configfile)
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/cli/cli.py", line 30, in cli
    COMMAND_FUNCTION_MAP[command](toml_path=config_file)
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/cli/prep.py", line 132, in prep
    vak_df, csv_path = core.prep(
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/core/prep.py", line 201, in prep
    vak_df = dataframe.from_files(
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/io/dataframe.py", line 134, in from_files
    spect_files = audio.to_spect(
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/io/audio.py", line 236, in to_spect
    spect_files = list(bag.map(_spect_file))
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/bag/core.py", line 1480, in __iter__
    return iter(self.compute())
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/multiprocessing.py", line 233, in get
    result = get_async(
  File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

@NickleDave
Copy link
Collaborator

That's perfect, thanks so much. Will keep doing homework and let you know what I find out in the next couple of days

@NickleDave
Copy link
Collaborator

Just updating that I asked a question in the dask forum here:
https://dask.discourse.group/t/how-to-troubleshoot-optimize-n-partitions-partition-size-for-dask-bag/1216/2

@NickleDave
Copy link
Collaborator

Hi again @kalleknast just checking back -- I never got a reply on the dask forum.

Any chance you'd be willing to share data so I could replicate and test some of the different approaches here? I don't have any audio files that are quite that size right at hand although I could probably find some -- I think some of the USV files end up being pretty big because of the high sampling rate, e.g. https://zenodo.org/record/3428024

You could contact me at nicholdav at gmail if that's easier. Thanks!

@kalleknast
Copy link
Author

Hi @NickleDave. I just sent you a link to a part of the data set.

@NickleDave
Copy link
Collaborator

Awesome, thank you @kalleknast!

I will download this weekend to verify I can access, and see if I can replicate the issue on my machine.

I'm catching up on the backlog here and a bit busy at my day job right now, but hoping to get to this over the Thanksgiving holiday at the latest.

I will ask you to test any fix on your machine and of course add you as a contributor if we end up merging something.
Very appreciated!

@NickleDave NickleDave changed the title ENH: Out of memory when preparing the dataset (computing spectrograms) ENH: Optimize dask.bag for file sizes + add dask.bag options to config Nov 5, 2022
@NickleDave
Copy link
Collaborator

Confirming I was able to download the files and replicate this issue, thank you @kalleknast -- I've got this on the to-do list for features/enhancements now

@NickleDave
Copy link
Collaborator

Hi @kalleknast just want to let you know we didn't forget about you.
This is still on the to-do list.

@yardencsGitHub has me working on other things (#605) but this affects his group and we know it's an issue for people working with large files due to high sample rates, e.g. bat calls. So I will get to it

@kalleknast
Copy link
Author

Hi @NickleDave
Try my fix on many small files. It works fine on my large files. It should be good if it works on small too.
In audio.py:

import multiprocessing

replace line 229 in the original

bag = db.from_sequence(audio_files)

with

    npartitions = multiprocessing.cpu_count()
    bag = db.from_sequence(audio_files, npartitions=npartitions)

@NickleDave
Copy link
Collaborator

Thank you @kalleknast I will test this.

Should have a chance this weekend.
If it works fine I will just add the option in the config as we discussed and raise an issue to revisit in more detail in version 1.0.

That way we can get things working for you sooner.

@NickleDave
Copy link
Collaborator

NickleDave commented Feb 10, 2023

@kalleknast just icymi I did release a version 0.8.0 today that includes this
https://github.com/vocalpy/vak/releases/tag/0.8.0

You should be able to pip install now and install off conda-forge sometime tomorrow.
Thanks again for your help with this.

@kalleknast
Copy link
Author

Worked perfectly!

Thanks

@NickleDave
Copy link
Collaborator

Awesome, glad to hear it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ENH: enhancement enhancement; new feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants