diff --git a/arrayqueues/__init__.py b/arrayqueues/__init__.py index 9dba91c..3d0eb95 100644 --- a/arrayqueues/__init__.py +++ b/arrayqueues/__init__.py @@ -8,4 +8,5 @@ __version__ = "1.2.0" from arrayqueues.shared_arrays import ArrayQueue + # from arrayqueues.portable_queue import PortableQueue diff --git a/arrayqueues/portable_queue.py b/arrayqueues/portable_queue.py index e40ce59..586ee69 100644 --- a/arrayqueues/portable_queue.py +++ b/arrayqueues/portable_queue.py @@ -2,10 +2,11 @@ https://gist.github.com/FanchenBao/d8577599c46eab1238a81857bb7277c9 """ -from multiprocessing import queues, Value, get_context +from multiprocessing import Value, get_context, queues + class SharedCounter(object): - """ A synchronized shared counter. + """A synchronized shared counter. The locking done by multiprocessing.Value ensures that only a single process or thread may read or write the in-memory ctypes object. However, @@ -20,9 +21,9 @@ class SharedCounter(object): """ def __init__(self, n=0): - self.count = Value('i', n) + self.count = Value("i", n) - def increment(self, n = 1): + def increment(self, n=1): """ Increment the counter by n (default = 1) """ with self.count.get_lock(): self.count.value += n @@ -34,7 +35,7 @@ def value(self): class PortableQueue(queues.Queue): - """ A portable implementation of multiprocessing.Queue. + """A portable implementation of multiprocessing.Queue. Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X @@ -49,16 +50,24 @@ class PortableQueue(queues.Queue): def __init__(self, *args, **kwargs): self.size = SharedCounter(0) - super(PortableQueue, self).__init__(*args, ctx=get_context(), - **kwargs) + super(PortableQueue, self).__init__(*args, ctx=get_context(), **kwargs) def __getstate__(self): state = super(PortableQueue, self).__getstate__() return state + (self.size,) def __setstate__(self, state): - (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid, self.size) = state + ( + self._ignore_epipe, + self._maxsize, + self._reader, + self._writer, + self._rlock, + self._wlock, + self._sem, + self._opid, + self.size, + ) = state super(PortableQueue, self)._after_fork() def put(self, *args, **kwargs): @@ -77,5 +86,3 @@ def qsize(self): def empty(self): """ Reliable implementation of multiprocessing.Queue.empty() """ return not self.qsize() - - diff --git a/arrayqueues/shared_arrays.py b/arrayqueues/shared_arrays.py index 79583fd..235d333 100644 --- a/arrayqueues/shared_arrays.py +++ b/arrayqueues/shared_arrays.py @@ -1,12 +1,13 @@ +from datetime import datetime from multiprocessing import Array +from queue import Empty, Full -#try: -from arrayqueues.portable_queue import PortableQueue #as Queue -#except AttributeError: - # from multiprocessing import Queue +# except AttributeError: +# from multiprocessing import Queue import numpy as np -from datetime import datetime -from queue import Empty, Full + +# try: +from arrayqueues.portable_queue import PortableQueue # as Queue class ArrayView: @@ -122,11 +123,9 @@ def qsize(self): return self.queue.qsize() - class TimestampedArrayQueue(ArrayQueue): """A small extension to support timestamps saved alongside arrays""" - """ def put(self, element, timestamp=None): if self.view is None or not self.view.fits(element): diff --git a/arrayqueues/tests/test_portable_queue.py b/arrayqueues/tests/test_portable_queue.py index 97da2c6..c80ed5f 100644 --- a/arrayqueues/tests/test_portable_queue.py +++ b/arrayqueues/tests/test_portable_queue.py @@ -1,7 +1,9 @@ +import time from multiprocessing import Process -from arrayqueues.portable_queue import PortableQueue from queue import Empty -import time + +from arrayqueues.portable_queue import PortableQueue + class SourceProcess(Process): def __init__(self, n_elements): @@ -39,4 +41,4 @@ def test_portable_queue(): time.sleep(0.5) assert p1.source_queue.qsize() == 0 p2.join() - p1.join() \ No newline at end of file + p1.join() diff --git a/arrayqueues/tests/test_shared_arrays.py b/arrayqueues/tests/test_shared_arrays.py index 75d7b04..147b7e2 100644 --- a/arrayqueues/tests/test_shared_arrays.py +++ b/arrayqueues/tests/test_shared_arrays.py @@ -1,11 +1,14 @@ -from arrayqueues.shared_arrays import ArrayQueue, TimestampedArrayQueue, IndexedArrayQueue +import time from multiprocessing import Process -import multiprocessing.queues -from multiprocessing import Queue -import numpy as np from queue import Empty, Full -import unittest -import time + +import numpy as np + +from arrayqueues.shared_arrays import ( + ArrayQueue, + IndexedArrayQueue, + TimestampedArrayQueue, +) class SourceProcess(Process): @@ -52,7 +55,6 @@ def run(self): print(self.source_array.view.total_shape) - class SinkProcess(Process): def __init__(self, source_array, limit=None): super().__init__() @@ -63,7 +65,7 @@ def run(self): while True: try: item = self.source_array.get(timeout=0.5) - print('Got item') + print("Got item") assert item[0, 0] == 5 except Empty: break @@ -128,6 +130,3 @@ def test_clearing_queue(): p1.source_array.clear() time.sleep(1.0) assert p1.source_array.empty() - - - diff --git a/setup.py b/setup.py index 4e1d023..bfad377 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ from distutils.core import setup + from setuptools import find_packages with open("requirements_dev.txt") as f: