-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
55 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,43 +1,76 @@ | ||
import multiprocessing as mp | ||
import queue | ||
import pyrallel | ||
import os | ||
|
||
|
||
def f(iq, oq): | ||
# 30 bytes each | ||
# CONTENT = os.urandom(30) | ||
CONTENT = b'\xaa' * 10 + b'\xbb' * 10 + b'\xcc' * 10 | ||
|
||
|
||
def sender(sq): | ||
for _ in range(10): | ||
sq.put(CONTENT) | ||
|
||
|
||
def receiver(sq, q): | ||
try: | ||
while True: | ||
oq.put(iq.get(timeout=2)) | ||
content = sq.get(timeout=2) | ||
q.put(content) | ||
except queue.Empty: | ||
return | ||
|
||
|
||
class DummySerializer(object): | ||
def dumps(self, o): | ||
return o | ||
|
||
def loads(self, d): | ||
return d | ||
|
||
|
||
def test_shmqueue(): | ||
if not hasattr(pyrallel, 'ShmQueue'): | ||
return | ||
|
||
params = [ # chunk size, maxsize | ||
[50, 100], # chunk size > content, maxsize is enough | ||
[10, 100], # chunk size < content, maxsize is enough | ||
[50, 1], # chunk size > content, maxsize is limited | ||
[10, 1], # chunk size < content, maxsize is limited | ||
] | ||
|
||
for mode in ['fork', 'spawn']: | ||
mp.set_start_method(mode, force=True) | ||
ShmQueueCls = getattr(pyrallel, 'ShmQueue') | ||
sq = ShmQueueCls(chunk_size=1024 * 4, maxsize=5) | ||
q = mp.Queue() | ||
p1 = mp.Process(target=f, args=(sq, q,)) | ||
p2 = mp.Process(target=f, args=(sq, q,)) | ||
p1.start() | ||
p2.start() | ||
for param in params: | ||
sq = ShmQueueCls(chunk_size=param[0], maxsize=param[1], serializer=DummySerializer()) | ||
q = mp.Queue() | ||
# 3 senders and 2 receivers | ||
# each sender process add 10 content, in total 30 * 10 = 300 bytes | ||
p_senders = [mp.Process(target=sender, args=(sq,)) for _ in range(3)] | ||
p_receivers = [mp.Process(target=receiver, args=(sq, q)) for _ in range(2)] | ||
|
||
items = list(range(10)) | ||
for p in p_senders: | ||
p.start() | ||
for p in p_receivers: | ||
p.start() | ||
|
||
for i in items: | ||
sq.put(i) | ||
for p in p_senders: | ||
p.join() | ||
for p in p_receivers: | ||
p.join() | ||
sq.close() | ||
|
||
while True: | ||
try: | ||
e = q.get(timeout=2) | ||
assert e in items | ||
except queue.Empty: | ||
break | ||
|
||
p1.join() | ||
p2.join() | ||
sq.close() | ||
q.close() | ||
total_put = 30 # there should be in total 30 elements | ||
while True: | ||
try: | ||
r = q.get(timeout=2) | ||
total_put -= 1 | ||
assert r == CONTENT | ||
except queue.Empty: | ||
break | ||
|
||
assert total_put == 0 |