Skip to content

Commit

Permalink
make shmqueue support spawn mode
Browse files Browse the repository at this point in the history
  • Loading branch information
GreatYYX committed Jul 14, 2020
1 parent a78559f commit 6066a2f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
14 changes: 14 additions & 0 deletions pyrallel/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import struct
import sys
import time
import dill


if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -73,6 +74,19 @@ def __init__(self, chunk_size=1*1024*1024, maxsize=0, serializer=None):
for _ in range(maxsize):
self.data_blocks.append(SharedMemory(create=True, size=self.chunk_size))

def __getstate__(self):
return (dill.dumps(self.serializer), self.chunk_size, self.producer_lock,
self.consumer_lock, self.block_locks, dill.dumps(self.meta_blocks), dill.dumps(self.data_blocks))

def __setstate__(self, state):
(self.serializer, self.chunk_size, self.producer_lock,
self.consumer_lock, self.block_locks, self.meta_blocks, self.data_blocks) = state
self.buf_msg_id = None
self.buf_msg_body = None
self.meta_blocks = dill.loads(self.meta_blocks)
self.data_blocks = dill.loads(self.data_blocks)
self.serializer = dill.loads(self.serializer)

def get_meta(self, block, type_):
addr_s, addr_e, ctype = self.__class__.META_STRUCT.get(type_)
return struct.unpack(ctype, block.buf[addr_s : addr_e])[0]
Expand Down
51 changes: 26 additions & 25 deletions pyrallel/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,29 @@ def test_shmqueue():
if not hasattr(pyrallel, 'ShmQueue'):
return

mp.set_start_method('fork')
ShmQueueCls = getattr(pyrallel, 'ShmQueue')
sq = ShmQueueCls(chunk_size=1024 * 4, maxsize=5)
q = mp.Queue()
p1 = mp.Process(target=f, args=(sq, q,))
p2 = mp.Process(target=f, args=(sq, q,))
p1.start()
p2.start()

items = list(range(10))

for i in items:
sq.put(i)

while True:
try:
e = q.get(timeout=2)
assert e in items
except queue.Empty:
break

p1.join()
p2.join()
sq.close()
q.close()
for mode in ['fork', 'spawn']:
mp.set_start_method(mode, force=True)
ShmQueueCls = getattr(pyrallel, 'ShmQueue')
sq = ShmQueueCls(chunk_size=1024 * 4, maxsize=5)
q = mp.Queue()
p1 = mp.Process(target=f, args=(sq, q,))
p2 = mp.Process(target=f, args=(sq, q,))
p1.start()
p2.start()

items = list(range(10))

for i in items:
sq.put(i)

while True:
try:
e = q.get(timeout=2)
assert e in items
except queue.Empty:
break

p1.join()
p2.join()
sq.close()
q.close()

0 comments on commit 6066a2f

Please sign in to comment.