Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

per-worker contrib.concurrent progress #973

Open
5 of 8 tasks
Chrismarsh opened this issue May 16, 2020 · 7 comments
Open
5 of 8 tasks

per-worker contrib.concurrent progress #973

Chrismarsh opened this issue May 16, 2020 · 7 comments
Labels
help wanted 🙏 We need you (discussion or implementation)

Comments

@Chrismarsh
Copy link

Chrismarsh commented May 16, 2020

  • I have marked all applicable categories:
    • exception-raising bug
    • visual output bug
    • documentation request (i.e. "X is missing from the documentation." If instead I want to ask "how to use X?" I understand [StackOverflow#tqdm] is more appropriate)
    • new feature request
  • I have visited the [source website], and in particular
    read the [known issues]
  • I have searched through the [issue tracker] for duplicates
  • I have mentioned version numbers, operating system and
    environment, where applicable:
import tqdm, sys
print(tqdm.__version__, sys.version, sys.platform)
4.45.0 3.7.6 (default, Feb 13 2020, 11:50:51)
[Clang 11.0.0 (clang-1100.0.33.16)] darwin

There is a gap in the documentation regarding how to us tqdm with concurrent future ProcessPools.

The documentation and contrib.concurrent apply to the case where multiple workers chip away at a large problem, reflecting a small chunksize. Thus as workers finish, the progress can be updated.

However, for the case with a few workers and large chunksize, it is unclear how to update a progress bar from within the future. Specifically, I have a workload where I have perhaps 3 million elements to process, and the processing is compute and IO heavy -- open file and do work on it. Thus, I use large chunksizes O(10,000) so-as to have the most efficiency (limits how many times the file gets opened).

For example:

    L = range(100000)
    def foo(element_id):

        sleep(0.1) # replace with something compute heavy with element_id
        
        # I would like to do something like this
        # tqdm.update(1)  
        return element_id, element_id+element_id

    with futures.ProcessPoolExecutor(max_workers=16) as executor:
        executor.map(foo, L, chunksize= len(L) // 16)

The documentation suggests the following within the process call

for _ in trange(total, desc=text, position=n):
        sleep(interval)

However this doesn't work as, if sleep(interval) is replaced with an actual work load just does the same thing total times too many.

@casperdcl
Copy link
Sponsor Member

I'm sorry but I don't understand. Is this a question? Is it a statement? A bug report? A feature request? It would be helpful if you could say something like:

code:

from tqdm.contrib.concurrent import process_map
process_map(foo, L, max_workers=16, chunksize=len(L) // 16)

current output:

some stuff i don't like

expected output:

some stuff i like

@casperdcl casperdcl self-assigned this May 16, 2020
@casperdcl casperdcl added the need-feedback 📢 We need your response (question) label May 16, 2020
@Chrismarsh
Copy link
Author

Oh, sorry!
It's a question + feedback that the documentation is unclear.

If I do this:

from tqdm.contrib.concurrent import process_map
L = range(10000)
def foo(element_id):
    sleep(0.01) # replace with something compute heavy
    return element_id, element_id+element_id
ret = process_map(foo, L, max_workers=16, chunksize=len(L) // 16)

the progressbar stays at 0% then jumps to 100%.

This occurs because the wrapper (and the method in the documentation) wraps the returning futures in the tqdm progressbar. However, there are workloads that work best if n futures are spawned and they work on large chunks. If the workload across workers is pretty consistent, then they all terminate at the same time, causing the progress bar to jump from 0% -> 100%.

I would like to update the progress bar from within a future to show how the per-worker progress is going.

@mlissner
Copy link

I'm running into this same thing to the extent that I thought tqdm was broken. I have a chunksize of 1,000, so that means that the progress bar only updates when a worker completes 1,000 items. That takes awhile and in the meantime, it looks like tqdm is doing nothing, because I know the system is doing work (thanks htop), but the progress bar isn't moving.

@casperdcl casperdcl changed the title Updating from within concurrent.futurs ProcessPool per-worker contrib.concurrent progress Aug 22, 2021
@casperdcl
Copy link
Sponsor Member

casperdcl commented Aug 22, 2021

ah well if the number of running processes/threads == the number of tasks, then yes all tasks will finish simultaneously.

Nested progress (each process/thread reports its own progress) needs to be manually implemented by users.

@retorquere
Copy link

Is there an example on how to implement this manually? I'm running into the same chunksize problem

@casperdcl casperdcl removed their assignment Jun 18, 2022
@casperdcl casperdcl added help wanted 🙏 We need you (discussion or implementation) and removed need-feedback 📢 We need your response (question) labels Jun 18, 2022
@eggplants
Copy link

I have tried this:

  • thread_map: successful
2023-01-26.11.36.23.mov
import time

from tqdm import tqdm
from tqdm.contrib.concurrent import thread_map

def sleep(duration: int) -> None:
  for _ in tqdm(range(duration), leave=False):
    time.sleep(1)

if __name__ == "__main__":
  thread_map(sleep, list(range(10)), max_workers=4, leave=False, desc="[ROOT]")
  • process_map: failed
2023-01-26.11.38.58.mov
import time

from tqdm import tqdm
from tqdm.contrib.concurrent import process_map

def sleep(duration: int) -> None:
  for _ in tqdm(range(duration), leave=False):
    time.sleep(1)

if __name__ == "__main__":
  process_map(sleep, list(range(10)), max_workers=4, leave=False, desc="[ROOT]")

@NyaMisty
Copy link

Actually one can simply monkeypatch the internal _executor_map function and using fut.add_done_callback() function provided by executor to solve this issue:

from concurrent.futures import ThreadPoolExecutor

def _executor_map(PoolExecutor, fn, *iterables, **tqdm_kwargs):
    """
    Implementation of `thread_map` and `process_map`.

    Parameters
    ----------
    tqdm_class  : [default: tqdm.auto.tqdm].
    max_workers  : [default: min(32, cpu_count() + 4)].
    chunksize  : [default: 1].
    lock_name  : [default: "":str].
    """
    from tqdm.contrib.concurrent import ensure_lock, length_hint, tqdm_auto, cpu_count
    kwargs = tqdm_kwargs.copy()
    if "total" not in kwargs:
        kwargs["total"] = length_hint(iterables[0])
    tqdm_class = kwargs.pop("tqdm_class", tqdm_auto)
    max_workers = kwargs.pop("max_workers", min(32, cpu_count() + 4))
    chunksize = kwargs.pop("chunksize", 1)
    lock_name = kwargs.pop("lock_name", "")
    with ensure_lock(tqdm_class, lock_name=lock_name) as lk:
        # share lock in case workers are already using `tqdm`
        with PoolExecutor(max_workers=max_workers, initializer=tqdm_class.set_lock,
                          initargs=(lk,)) as ex:
            with tqdm_class(**kwargs) as pbar:
                orisubmit = ex.submit
                def patchsubmit(*args, **kwargs):
                    fut = orisubmit(*args, **kwargs)
                    fut.add_done_callback(lambda _: pbar.update())
                    return fut
                ex.submit = patchsubmit
                return list(ex.map(fn, *iterables, chunksize=chunksize))

thread_map = lambda fn, *iterables, **tqdm_kwargs: _executor_map(ThreadPoolExecutor, fn, *iterables, **tqdm_kwargs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted 🙏 We need you (discussion or implementation)
Projects
None yet
Development

No branches or pull requests

6 participants