Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelism safety (thread, multiprocessing) #329

Merged
merged 11 commits into from
Sep 26, 2017
Merged

Conversation

lrq3000
Copy link
Member

@lrq3000 lrq3000 commented Dec 28, 2016

Implement parallelism safety in tqdm by providing a new set_lock() class method. This is a follow-up on #291.

For Linux (and any platform supporting fork), no action is required from the user.

For Windows, here is the canonical example usage:

from time import sleep
from tqdm import tqdm
from multiprocessing import Pool, freeze_support, Lock

def progresser(n):         
    text = "bar{}".format(n)
    for i in tqdm(range(5000), desc=text, position=n, leave=True):
        sleep(0.001)

def init_child(write_lock):
    """
    Provide tqdm with the lock from the parent app.
    This is necessary on Windows to avoid racing conditions.
    """
    tqdm.set_lock(write_lock)

if __name__ == '__main__':
    freeze_support()
    write_lock = Lock()
    L = list(range(10))
    Pool(len(L), initializer=init_child, initargs=(write_lock,)).map(progresser, L)

Todo:

  • Unit test (using squash_ctrl and a fake IO, just check that there are 10 lines at the end, each with a different number, just like the example case provided) with the sample code above as a unit test + with mp.set_start_method('spawn') to force mimicking Windows "no-fork" spawning of processes. Else it's impossible to test on Linux.
  • Flake8
  • add in the documentation how to use tqdm with locks on Windows (as it should be transparent on Linux)
  • LIMITATION: tqdm.write() won't work, because there is no way to implement it without a centralized manager that is aware of all bars. See Multi tqdm #143 for a possible solution. Or maybe we can change _instances type to a multiprocessing.Queue() (or another type that is shareable across multiprocesses)?
  • update documentation (e.g. Documentation revision: AttributeError: 'DummyTqdmFile' object has no attribute 'flush' #439)

@lrq3000 lrq3000 added need-feedback 📢 We need your response (question) p0-bug-critical ☢ Exception rasing labels Dec 28, 2016
@lrq3000 lrq3000 added this to the v5.0.0 milestone Dec 28, 2016
@lrq3000
Copy link
Member Author

lrq3000 commented Dec 28, 2016

Note that there is a minor display glitch: when the first bar reaches 100% and close() is called, a line return is appended and all subsequent bars are displaced one line below.

This is because of this line of code and because the _instances list is not shared across the children processes (because else we could do a workaround, but here we can't). This is not the first time it happens (in fact it happens anytime we use multiple bars), and I think this "special closing rule" makes tqdm's behavior less consistent, which is bad for future maintenance and for bugs.

I did not want to remove this here without a discussion, this should be done in another PR. What do you think @casperdcl ? If we remove this line of code, users will have to print("\n") themselves after their tqdm loops, so this is a bit more cumbersome but it will work consistently even if they are using multiple bars (parallel or sequentially).

@lrq3000
Copy link
Member Author

lrq3000 commented Dec 29, 2016

I can't get a unit test to work, simply because I can't find a way to get the printed result. @casperdcl if you have any idea how to get sys.stderr output from forked children (maybe you can reuse your tricks that you used on the CLI tests?)...

Here is my unit test so far (yes progresser() and init_child() need to stay outside of the test):

def progresser(args):
    """ Children job """
    n, our_file = args  # unpack arguments
    text = "bar{}".format(n)
    for i in tqdm(range(100), file=our_file, miniters=1, mininterval=0, desc=text,
                        position=n+1, leave=True, bar_format='{l_bar}'):
        pass


def init_child(write_lock):
    """ Provide lock from parent to children """
    tqdm.set_lock(write_lock)


@with_setup(pretest, posttest)
def test_parallel():
    """ Test multiprocessing parallel safety """

    # force mimick Windows "no-fork" spawning of processes (children are isolated)
    # only for Python >= 3.4
    try:
        multiprocessing.set_start_method('spawn')
    except AttributeError as exc:
        pass

    #multiprocessing.freeze_support()
    write_lock = multiprocessing.Lock()
    with closing(StringIO()) as our_file:
        L = list(range(3))  # define 10 tqdm bars
        pool = multiprocessing.Pool(len(L), initializer=init_child, initargs=(write_lock,))
        try:
            pool.map(progresser, zip(L, [our_file] * len(L)))
        finally:
            pool.close()
            pool.join()
        res = squash_ctrlchars(our_file.getvalue())
        assert res == ['pos0 bar:   0%|', 'pos1 bar:   0%|']

@lrq3000 lrq3000 mentioned this pull request Dec 31, 2016
th_lock = th.Lock() # thread lock


class TqdmDefaultWriteLock(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Tqdm prefix needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just to follow the nomenclatura we had (eg, TqdmDeprecationWarning).

@@ -43,6 +46,34 @@ def __init__(self, msg, fp_write=None, *a, **k):
super(TqdmDeprecationWarning, self).__init__(msg, *a, **k)


# Create global parallelism locks to avoid racing issues with parallel bars
# works only if fork available (Linux, MacOSX, but not on Windows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

macOS now

Copy link
Contributor

@CrazyPython CrazyPython left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to sound pushy, but I think the PyCharm editor is superior to Notepad++ - I can't vy for the other IntelliJ IDEs, but PyCharm is good. For example, flake8 is automatically run on files and cmd-shift-l fixes errors; there's also automatic variable renaming.

tqdm/_tqdm.py Outdated
1 / self.avg_time if self.avg_time else None,
self.bar_format))
# Print bar's update
self.sp(self.format_meter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spacing here is not so good.

@lrq3000
Copy link
Member Author

lrq3000 commented Jan 5, 2017

Thanks @CrazyPython , I will implement your fixes soon. About PyCharm, I need something very lightweight as I have hundreds of tabs open at once!

@gwerbin
Copy link

gwerbin commented Jan 24, 2017

Is this good to go apart from the stylistic issues? I can fix those.

@lrq3000
Copy link
Member Author

lrq3000 commented Jan 24, 2017 via email

@CrazyPython
Copy link
Contributor

@lrq3000 checkout The Great Suspender then. It lowers the memory usage of tabs by suspending them

@lrq3000
Copy link
Member Author

lrq3000 commented Jan 25, 2017 via email

@casperdcl
Copy link
Sponsor Member

casperdcl commented Jan 26, 2017

a few thoughts

  • if tqdm.write is not supported, I'm not sure if there's much point. because:

(1)

def progresser(n):         
    text = "bar{}".format(n)
    for i in tqdm(range(5000), desc=text, position=n, leave=True):
        sleep(0.001)

def init_child(write_lock):
    """
    Provide tqdm with the lock from the parent app.
    This is necessary on Windows to avoid racing conditions.
    """
    tqdm.set_lock(write_lock)

if __name__ == '__main__':
    freeze_support()
    write_lock = Lock()
    L = list(range(10))
    Pool(len(L), initializer=init_child, initargs=(write_lock,)).map(progresser, L)

could be replaced with:

(2)

def progresser(write_lock, n):
    tqdm.set_lock(write_lock)
    text = "bar{}".format(n)
    for i in tqdm(range(5000), desc=text, position=n, leave=True):
        sleep(0.001)

if __name__ == '__main__':
    freeze_support()
    write_lock = Lock()
    L = list(range(10))
    procs = [Process(target=progresser, args=(write_lock, i)) for i in L]
    [p.start() for p in procs]
    [p.join() for p in procs]

which in turn becomes:

(3)

def progresser(lk, n):
    lk.acquire()
    with tqdm(total=5000, desc="bar " + str(n), position=n) as t:
        lk.release()
        def update():
            lk.acquire()
            t.update()
            lk.release()
        for i in range(5000):
            sleep(0.001)
            update()

if __name__ == '__main__':
    freeze_support()
    write_lock = Lock()
    L = list(range(10))
    procs = [Process(target=progresser, args=(write_lock, i)) for i in L]
    [p.start() for p in procs]
    [p.join() for p in procs]

Incidentally, the last example is how I've been using tqdm in my parallel code. Without this PR, (3) is only 8 more lines of code than with (2). Or 5 more if we were using a manual update rather than trange in (2). I feel like we could do a little more intelligent things in the internals.

@lrq3000
Copy link
Member Author

lrq3000 commented Jan 29, 2017

@casperdcl I agree it can be done from outside, but then we're outsourcing the complexity of using tqdm in parallel to the user. Also, the PR does fix parallelism automatically on Linux (and probably MacOS), it's only on Windows that it cannot do so (because there is no fork), so then the PR provides a unified and simplified way to just provide a lock, and tqdm does the rest.

So yes, the PR provides a simplified and unified API to support parallelism, although it does not provide all functionalities (notably tqdm.write(), but there is no way around, your code snippet also do not provide that, the only way would be a central manager as in #143).

@lrq3000
Copy link
Member Author

lrq3000 commented Jan 29, 2017

PS: also with solution 3, we lose speed advantage since we cannot support iterable-based tqdm. And please keep in mind that the progresser function is not meant to be only for tqdm, it's a dummy function that represents user's parallel function. Managing tqdm parallelism in a real user function will be more messy than solution 3 example, whereas it will always be the same with the PR.

Just my two cents anyway, I don't use parallelism much.

@casperdcl
Copy link
Sponsor Member

I guess. Maybe need more feedback from people who do use parallel code a lot whether they have any suggestions/preferences.

btw, the random places tqdm finds itself... https://media.readthedocs.org/pdf/bigartm/latest/bigartm.pdf (dependencies: boost, numpy, pandas, protobuf... and tqdm).

@lrq3000
Copy link
Member Author

lrq3000 commented Jan 29, 2017 via email

@pyeguy
Copy link

pyeguy commented Jan 31, 2017

i'm a bit of a novice so this is just my 2¢ ...

but I usually use multiprocess in python concurrency b/c my tasks are usually cpu bound. normally I either subclass mulitprocess.Process and override the run method a la:

from multiprocessing import Process, Queue

class my_worker(Process):
  def __init__(self, output_queue, chunk_to_do):
  super().__init__()
  self.output_queue = output_queue
  self.chunk_to_do = chunk_to_do

def run(self)
  for thing in chunk_to_do:
    self.output_queue.put(hard_math(thing))

if __name__ == "__main__":
  q = Queue()
  def chunker(longlist):
     yield chunk
  workers = []   
  for chunk in chunker(longlist):
    p = my_worker(q,chunk)
    p.start()
    workers.append(p)

  for worker in wokers:
    worker.join()

#  do stuff with data...

or lately i've been fooling around with the high-level wrapper library concurrent.futures b/c it's a little cleaner feeling for simpler tasks. which i was commenting on in issue #97

the way i've been using it is along the lines of:

def hard_math(arg1,arg2...):
  return result

if __name__ == "__main__":

  to_proc = [x for x in things_to_do_math_on]
  CHUNKSIZE = len(to_proc) // N
  with ProcessPoolExecutor(max_workers=N) as e:
    emap = e.map(functools.partial(hard_math,arg2=arg2),to_proc,chunksize=CHUNKSIZE)
    output = list(emap) 

  # do stuff with data

'

@Letme
Copy link

Letme commented Mar 14, 2017

Can we get this merged in?

@lrq3000
Copy link
Member Author

lrq3000 commented Sep 23, 2017

Thanks a lot @casperdcl :-) Maybe you should double check the two code blocks I commented in this commit, I am not sure what is the best thing to do about it, if you already tested the cases I mention then it should be fine :-)

casperdcl added a commit that referenced this pull request Sep 24, 2017
pass
cls.monitor = None
else:
cls.monitor = None
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lrq3000 happy with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes :-) thanks!

@casperdcl
Copy link
Sponsor Member

@lrq3000 it would be great if you could post a quick unit test as you described:

Unit test (using squash_ctrl and a fake IO, just check that there are 10 lines at the end, each with a different number, just like the example case provided) with the sample code above as a unit test + with mp.set_start_method('spawn') to force mimicking Windows "no-fork" spawning of processes. Else it's impossible to test on Linux.

examples/parallel_bars.py should be a decent start.

Coverage is still currently 100% but that doesn't prove it always works.

Copy link
Member Author

@lrq3000 lrq3000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok!

pass
cls.monitor = None
else:
cls.monitor = None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes :-) thanks!

@lrq3000
Copy link
Member Author

lrq3000 commented Sep 24, 2017

@casperdcl I would love to but unfortunately I don't have the time right now, sorry :-( Also I'm not sure that mp.set_start_method('spawn') will force mimicking Windows "no-fork" behavior, I just read that somewhere on SO, so this will probably need some trial and error (or maybe not work at all, maybe we might not be able to simulate Windows on Travis...).

@casperdcl casperdcl merged commit 4278974 into master Sep 26, 2017
casperdcl added a commit that referenced this pull request Sep 26, 2017
fixes #285 -> #291 -> #329
fixes #422
fixes #439

fixes #323
fixes #324
fixes #334
fixes #407
fixes #418

related to:
- #97
- #143
- #331
- #361
- #384
- #385
- #417
@casperdcl casperdcl mentioned this pull request Sep 26, 2017
2 tasks
@casperdcl casperdcl deleted the parallel-print-fix2 branch September 26, 2017 10:37
@IceflowRE
Copy link

IceflowRE commented Sep 28, 2017

@Illuminae Which OS? are you using a specific terminal? (e.g. PyCharm has problems)


Everyhing which follows was done on Win10 64bit and i will dig deeper at the weekend, i dont have the time currently. So here just a small statement.

I think they are not the result of this PR. (But idk so i will point it out here.)
I used the nested progress bar code from the ReadMe.

from time import sleep
from tqdm import trange
from multiprocessing import Pool, freeze_support, Lock

L = list(range(9))

def progresser(n):
    interval = 0.001 / (n + 2)
    total = 5000
    text = "#{}, est. {:<04.2}s".format(n, interval * total)
    for i in trange(total, desc=text, position=n):
        sleep(interval)

if __name__ == '__main__':
    freeze_support()  # for Windows support
    p = Pool(len(L),
             # again, for Windows support
             initializer=tqdm.set_lock, initargs=(Lock(),))
    p.map(progresser, L)
    print("\n" * (len(L) - 2))

With the Windows Power Shell i get those results or similiar:

PS D:\Iceflower\programming\tqdm> python test.py
#0, est. 2.50s: 100%|███████████████████████████████████████████████████████████████| 5000/5000 [00:05<00:00, 837.04it/s]
#1, est. 1.70s:  98%|█████████████████████████████████████████████████████████████▌ | 4889/5000 [00:05<00:00, 821.38it/s]
#1, est. 1.70s: 100%|███████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 829.86it/s]
#2, est. 1.20s: 100%|███████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 823.93it/s]
#3, est. 1.00s: 100%|███████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 827.37it/s]
#4, est. 0.83s: 100%|███████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 811.25it/s]
#5, est. 0.71s: 100%|███████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 812.10it/s]
#6, est. 0.62s: 100%|███████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 811.32it/s]
#7, est. 0.56s: 100%|███████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 803.09it/s]
PS D:\Iceflower\programming\tqdm> ██████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 808.79it/s]

and command line

D:\Iceflower\programming\tqdm>python test.py
#0, est. 2.50s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:05<00:00, 840.30it/s]
#1, est. 1.70s:  99%|███████████████████████████████████████████████████████████? | 4936/5000 [00:05<00:00, 838.44it/s]
#1, est. 1.70s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:05<00:00, 838.20it/s]
#2, est. 1.20s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:05<00:00, 836.62it/s]
#3, est. 1.00s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 833.22it/s]
#4, est. 0.83s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 823.69it/s]
#5, est. 0.71s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 830.42it/s]
#6, est. 0.62s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 821.60it/s]
#7, est. 0.56s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 816.37it/s]
#8, est. 0.50s: 100%|█████████████████████████████████████████████████████████████| 5000/5000 [00:06<00:00, 823.41it/s]
D:\Iceflower\programming\tqdm>

I replaced the "tofu" with a question mark

Interesting here is that one bar get doubled, the second bar has not a full bar and inside the power shell the directory get printed into the last bar.

EDIT: i just saw it could be issued here: #445 ?


PS. pressing Ctrl+C leads into a KeyboeadInterrupt and stays there. Spamming it results into a python has stopped working, but wont end the main python program. You have to close the terminal.

@Illuminae
Copy link

@IceflowRE I was using the regular Mac OS X Terminal, the server I was executing on runs Debian 8.9.

@casperdcl
Copy link
Sponsor Member

@Illuminae @IceflowRE see #454, maybe you just need ascii=True?

@dyno
Copy link
Contributor

dyno commented Nov 6, 2017

with patched example/parallel_bars.py, (make the top bar run faster and thus finish earlier).

diff --git a/examples/parallel_bars.py b/examples/parallel_bars.py
index 98eb06e..04698a8 100644
--- a/examples/parallel_bars.py
+++ b/examples/parallel_bars.py
@@ -8,7 +8,7 @@ L = list(range(9))


 def progresser(n):
-    interval = 0.001 / (n + 2)
+    interval = 0.001 / (len(L) - n + 2)
     total = 5000
     text = "#{}, est. {:<04.2}s".format(n, interval * total)
     for _ in tqdm(range(total), desc=text, position=n):

still see the same problem as @IceflowRE. and here is my running env,

#  iTerm2 

$ uname -a
Darwin ip-10-50-40-199.ec2.internal 17.2.0 Darwin Kernel Version 17.2.0: Fri Sep 29 18:27:05 PDT 2017; root:xnu-4570.20.62~3/RELEASE_X86_64 x86_64
 
$ bash --version
GNU bash, version 4.4.12(1)-release (x86_64-apple-darwin16.3.0)

$ pip3 freeze | grep tqdm
tqdm==4.19.4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
need-feedback 📢 We need your response (question) p0-bug-critical ☢ Exception rasing to-review 🔍 Awaiting final confirmation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet