Skip to content

Commit

Permalink
merged master & linted
Browse files Browse the repository at this point in the history
  • Loading branch information
vigji committed Feb 17, 2021
1 parent b7b61b4 commit 9370165
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 33 deletions.
1 change: 1 addition & 0 deletions arrayqueues/__init__.py
Expand Up @@ -8,4 +8,5 @@
__version__ = "1.2.0"

from arrayqueues.shared_arrays import ArrayQueue

# from arrayqueues.portable_queue import PortableQueue
29 changes: 18 additions & 11 deletions arrayqueues/portable_queue.py
Expand Up @@ -2,10 +2,11 @@
https://gist.github.com/FanchenBao/d8577599c46eab1238a81857bb7277c9
"""

from multiprocessing import queues, Value, get_context
from multiprocessing import Value, get_context, queues


class SharedCounter(object):
""" A synchronized shared counter.
"""A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
Expand All @@ -20,9 +21,9 @@ class SharedCounter(object):
"""

def __init__(self, n=0):
self.count = Value('i', n)
self.count = Value("i", n)

def increment(self, n = 1):
def increment(self, n=1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
Expand All @@ -34,7 +35,7 @@ def value(self):


class PortableQueue(queues.Queue):
""" A portable implementation of multiprocessing.Queue.
"""A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
Expand All @@ -49,16 +50,24 @@ class PortableQueue(queues.Queue):

def __init__(self, *args, **kwargs):
self.size = SharedCounter(0)
super(PortableQueue, self).__init__(*args, ctx=get_context(),
**kwargs)
super(PortableQueue, self).__init__(*args, ctx=get_context(), **kwargs)

def __getstate__(self):
state = super(PortableQueue, self).__getstate__()
return state + (self.size,)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid, self.size) = state
(
self._ignore_epipe,
self._maxsize,
self._reader,
self._writer,
self._rlock,
self._wlock,
self._sem,
self._opid,
self.size,
) = state
super(PortableQueue, self)._after_fork()

def put(self, *args, **kwargs):
Expand All @@ -77,5 +86,3 @@ def qsize(self):
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()


15 changes: 7 additions & 8 deletions arrayqueues/shared_arrays.py
@@ -1,12 +1,13 @@
from datetime import datetime
from multiprocessing import Array
from queue import Empty, Full

#try:
from arrayqueues.portable_queue import PortableQueue #as Queue
#except AttributeError:
# from multiprocessing import Queue
# except AttributeError:
# from multiprocessing import Queue
import numpy as np
from datetime import datetime
from queue import Empty, Full

# try:
from arrayqueues.portable_queue import PortableQueue # as Queue


class ArrayView:
Expand Down Expand Up @@ -122,11 +123,9 @@ def qsize(self):
return self.queue.qsize()



class TimestampedArrayQueue(ArrayQueue):
"""A small extension to support timestamps saved alongside arrays"""

"""
def put(self, element, timestamp=None):

if self.view is None or not self.view.fits(element):
Expand Down
8 changes: 5 additions & 3 deletions arrayqueues/tests/test_portable_queue.py
@@ -1,7 +1,9 @@
import time
from multiprocessing import Process
from arrayqueues.portable_queue import PortableQueue
from queue import Empty
import time

from arrayqueues.portable_queue import PortableQueue


class SourceProcess(Process):
def __init__(self, n_elements):
Expand Down Expand Up @@ -39,4 +41,4 @@ def test_portable_queue():
time.sleep(0.5)
assert p1.source_queue.qsize() == 0
p2.join()
p1.join()
p1.join()
21 changes: 10 additions & 11 deletions arrayqueues/tests/test_shared_arrays.py
@@ -1,11 +1,14 @@
from arrayqueues.shared_arrays import ArrayQueue, TimestampedArrayQueue, IndexedArrayQueue
import time
from multiprocessing import Process
import multiprocessing.queues
from multiprocessing import Queue
import numpy as np
from queue import Empty, Full
import unittest
import time

import numpy as np

from arrayqueues.shared_arrays import (
ArrayQueue,
IndexedArrayQueue,
TimestampedArrayQueue,
)


class SourceProcess(Process):
Expand Down Expand Up @@ -52,7 +55,6 @@ def run(self):
print(self.source_array.view.total_shape)



class SinkProcess(Process):
def __init__(self, source_array, limit=None):
super().__init__()
Expand All @@ -63,7 +65,7 @@ def run(self):
while True:
try:
item = self.source_array.get(timeout=0.5)
print('Got item')
print("Got item")
assert item[0, 0] == 5
except Empty:
break
Expand Down Expand Up @@ -128,6 +130,3 @@ def test_clearing_queue():
p1.source_array.clear()
time.sleep(1.0)
assert p1.source_array.empty()



1 change: 1 addition & 0 deletions setup.py
@@ -1,4 +1,5 @@
from distutils.core import setup

from setuptools import find_packages

with open("requirements_dev.txt") as f:
Expand Down

0 comments on commit 9370165

Please sign in to comment.