Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
Prevent Queue.qsize() from raising NotImplementedError on Mac OS X
Browse files Browse the repository at this point in the history
multiprocessing.Queue.qsize() may raise NotImplementedError on Unix
platforms like Mac OS X where sem_getvalue() is not implemented. In
order to address this problem, subclass Queue and use an internal
synchronized shared counter whose value is updated when the put() and
get() methods are called. As a side benefit, this makes it possible
to have a reliable version of both qsize() and empty(). Thanks to
Jean-Baptiste Marquette for reporting this problem (#11).

This merges branch 'fix-Queue.qsize-NotImplementedError'
  • Loading branch information
Víctor Terrón committed May 7, 2014
2 parents 4f84dc5 + 75d93bd commit 9ca6b4b
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 5 deletions.
1 change: 1 addition & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ Javier Blasco, our most enthusiastic user, for such thoughtful suggestions
Pablo Ramírez, the second person to ever use LEMON — that we know of
Sofía León, for designing for us such a marvelous LEMON icon
José Enrique Ruiz, for suggesting that we set extglob
Jean-Baptiste Marquette, for reporting a bug under Mac OS X
2 changes: 1 addition & 1 deletion diffphot.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ def best(self, n, fraction = 0.1, pct = 0.01, max_iters = None, minimum = None):
# The Queue is global -- this works, but note that we could have
# passed its reference to the function managed by pool.map_async.
# See http://stackoverflow.com/a/3217427/184363
queue = multiprocessing.Queue()
queue = methods.Queue()

def parallel_light_curves(args):
""" Method argument of map_async to compute light curves in parallel.
Expand Down
65 changes: 65 additions & 0 deletions methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import functools
import logging
import math
import multiprocessing
import multiprocessing.queues
import numpy
import os
import os.path
Expand Down Expand Up @@ -553,3 +555,66 @@ def log_error(function, path, excinfo):
else:
msg = "Temporary file '%s' removed"
logging.debug(msg % path)

class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""

def __init__(self, n = 0):
self.count = multiprocessing.Value('i', n)

def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n

@property
def value(self):
""" Return the value of the counter """
return self.count.value


class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""

def __init__(self, *args, **kwargs):
super(Queue, self).__init__(*args, **kwargs)
self.size = SharedCounter(0)

def put(self, *args, **kwargs):
self.size.increment(1)
super(Queue, self).put(*args, **kwargs)

def get(self, *args, **kwargs):
self.size.increment(-1)
return super(Queue, self).get(*args, **kwargs)

def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value

def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
2 changes: 1 addition & 1 deletion periods.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def best_period(self, initial_step, exhaustive_step):
# The Queue is global -- this works, but note that we could have
# passed its reference to the function managed by pool.map_async.
# See http://stackoverflow.com/a/3217427/184363
queue = multiprocessing.Queue()
queue = methods.Queue()

def parallel_periods(args):
""" Method argument of map_async to compute periods in parallel.
Expand Down
4 changes: 2 additions & 2 deletions photometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
# The Queue is global -- this works, but note that we could have
# passed its reference to the function managed by pool.map_async.
# See http://stackoverflow.com/a/3217427/184363
queue = multiprocessing.Queue()
queue = methods.Queue()

class InputFITSFiles(collections.defaultdict):
""" Map each photometric filter to a list of FITS files.
Expand Down Expand Up @@ -197,7 +197,7 @@ def parallel_photometry(args):
the FITS image listed in options.coordinates, using the aperture, annulus
and dannulus defined by the PhotometricParameters object. The result is
another three-element tuple, which is put into the module-level 'queue'
multiprocessing.Queue object. This tuple contains (1) a database.Image
object, a process shared queue. This tuple contains (1) a database.Image
object, (2) a database.PhotometricParameters object and (3) a qphot.QPhot
object -- therefore mapping each FITS file and the parameters used for
photometry to the measurements returned by qphot.
Expand Down
2 changes: 1 addition & 1 deletion seeing.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def elongation(self, per = 50, mode = 'median'):
# This Queue is global -- this works, but note that we could have
# passed its reference to the function managed by pool.map_async.
# See http://stackoverflow.com/a/3217427/184363
queue = multiprocessing.Queue()
queue = methods.Queue()

parser = customparser.get_parser(description)
parser.usage = "%prog [OPTION]... INPUT_IMGS... OUTPUT_DIR"
Expand Down

2 comments on commit 9ca6b4b

@johcode
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this workaround helped me a lot, thanks for that.
In my case I had to adjust the code as follows.

`class Queue(multiprocessing.queues.Queue):
"""
Important changes:

- Bug: TypeError: __init__() missing 1 required keyword-only argument: 'ctx'
see:
https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue

- Bug: .get and .put might raise and handled exception.
Therefore, first call .get() or .put() then increment

- .empty() should return True or False

"""

def __init__(self, *args, **kwargs):
    super(Queue, self).__init__(*args, ctx=multiprocessing.get_context(), **kwargs)
    self._size = SharedCounter(0)

def put(self, *args, **kwargs):
    super(Queue, self).put(*args, **kwargs)
    self._size.increment(1)

def get(self, *args, **kwargs):
    val = super(Queue, self).get(*args, **kwargs)
    self._size.increment(-1)
    return val

def qsize(self) -> int:
    """ Reliable implementation of multiprocessing.Queue.qsize() """
    return self._size.value

def empty(self) -> bool:
    """ Reliable implementation of multiprocessing.Queue.empty() """
    return self.qsize() == 0`

@vterron
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to hear it was useful, @johcode!

  • Bug: TypeError: init() missing 1 required keyword-only argument: 'ctx'

Would you like to send a pull request? (so that the code works in both Python 2 and 3).

  • Bug: .get and .put might raise and handled exception.

This was fixed by @dchecks in #103.

  • .empty() should return True or False

I think that's the case already: we do not self.qsize() — the truth value of 0 is False, so that would return True. For all the other numbers, their truth value is True, which becomes False when we negate it.

Please sign in to comment.