diff --git a/tqdm/contrib/concurrent.py b/tqdm/contrib/concurrent.py index cd81d622a..4ad75011c 100644 --- a/tqdm/contrib/concurrent.py +++ b/tqdm/contrib/concurrent.py @@ -34,6 +34,7 @@ def _executor_map(PoolExecutor, fn, *iterables, **tqdm_kwargs): ---------- tqdm_class : [default: tqdm.auto.tqdm]. max_workers : [default: min(32, cpu_count() + 4)]. + timeout : [default: None]. chunksize : [default: 1]. lock_name : [default: "":str]. """ @@ -42,13 +43,14 @@ def _executor_map(PoolExecutor, fn, *iterables, **tqdm_kwargs): kwargs["total"] = length_hint(iterables[0]) tqdm_class = kwargs.pop("tqdm_class", tqdm_auto) max_workers = kwargs.pop("max_workers", min(32, cpu_count() + 4)) + timeout = kwargs.pop("timeout", None) chunksize = kwargs.pop("chunksize", 1) lock_name = kwargs.pop("lock_name", "") with ensure_lock(tqdm_class, lock_name=lock_name) as lk: # share lock in case workers are already using `tqdm` with PoolExecutor(max_workers=max_workers, initializer=tqdm_class.set_lock, initargs=(lk,)) as ex: - return list(tqdm_class(ex.map(fn, *iterables, chunksize=chunksize), **kwargs)) + return list(tqdm_class(ex.map(fn, *iterables, timeout=timeout, chunksize=chunksize), **kwargs)) def thread_map(fn, *iterables, **tqdm_kwargs): @@ -64,6 +66,10 @@ def thread_map(fn, *iterables, **tqdm_kwargs): Maximum number of workers to spawn; passed to `concurrent.futures.ThreadPoolExecutor.__init__`. [default: max(32, cpu_count() + 4)]. + timeout : int or float, optional + The iterator raises a TimeoutError if __next()__ is called and the + result isn't available within the timeout specified from the + original call to thread_map. [default: None]. """ from concurrent.futures import ThreadPoolExecutor return _executor_map(ThreadPoolExecutor, fn, *iterables, **tqdm_kwargs) @@ -82,6 +88,10 @@ def process_map(fn, *iterables, **tqdm_kwargs): Maximum number of workers to spawn; passed to `concurrent.futures.ProcessPoolExecutor.__init__`. [default: min(32, cpu_count() + 4)]. + timeout : int or float, optional + The iterator raises a TimeoutError if __next()__ is called and the + result isn't available within the timeout specified from the + original call to process_map. [default: None]. chunksize : int, optional Size of chunks sent to worker processes; passed to `concurrent.futures.ProcessPoolExecutor.map`. [default: 1].