Skip to content

Commit

Permalink
Merge pull request #15 from CraigMiloRogers/master
Browse files Browse the repository at this point in the history
Improved documentation and defaults for chunk_size and maxsize
  • Loading branch information
GreatYYX committed Dec 8, 2020
2 parents 4dab889 + c5ea0ab commit 6ca5ba8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ FILES=`find . -name '*.py'`
# The command for running mypy:
lint:
python3 -m mypy $(FILES)

# Run the unit tests.
test:
python3 -m pytest -s pyrallel/tests/test_map_reduce.py
python3 -m pytest -s pyrallel/tests/test_parallel_processor.py
python3 -m pytest -s pyrallel/tests/test_queue.py
45 changes: 30 additions & 15 deletions pyrallel/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,17 @@ class ShmQueue(mpq.Queue):
message using the next_block_id field in the shared buffer's metadata
area.
Messages are serialized for transfer from the sender to the receiver.
The serialized size of a message may not exceed the chunk size times
the maximum queue size. If the deadlock_immanent_check is enabled
(which is True by default), a ValueError will be raised on an attempt
to put a message that is too large.
Args:
chunk_size (int, optional): Size of each chunk. By default, it is 1*1024*1024.
maxsize (int, optional): Maximum size of queue. If it is 0 (default), \
it will be set to `ShmQueue.MAX_CHUNK_SIZE`.
chunk_size (int, optional): Size of each chunk. By default, it is `ShmQueue.DEFAULT_CHUNK_SIZE` (1*1024*1024). \
If it is 0, it will be set to `ShmQueue.MAX_CHUNK_SIZE` (512*1024*1024).
maxsize (int, optional): Maximum queue size, e.g. the maximum number of chunks available to a queue. \
If it is 0 (default), it will be set to `ShmQueue.DEFAULT_MAXSIZE` (2).
serializer (obj, optional): Serializer to serialize and deserialize data. \
If it is None (default), pickle will be used. \
The serializer should implement `loads(bytes data) -> object` \
Expand All @@ -69,15 +76,15 @@ class ShmQueue(mpq.Queue):
Note:
- `close` needs to be invoked once to release memory and avoid a memory leak.
- `qsize`, `empty` and `full` are implemented but may block.
- Each shared queue consumes one chared memory area for the shared list heads
- Each shared queue consumes one shared memory area for the shared list heads
and one shared memory area for each shared buffer. The underlying code in
multiprocessing.shared_memory.SharedMemory consumes one process file descriptor
for each shared memory area. There is a limit ont he number of file descriptors
for each shared memory area. There is a limit on the number of file descriptors
that a process may have open.
- Thus, there is a tradeoff between the chunk_size and maxsize: smaller chunks
use memory more effectively with some overhead cost, but may run into the limit
on the number of open file descriptors to process large messages and avoid blocking.
Larger chunks waste unused space, mut are less likely to run into the open file descriptor
Larger chunks waste unused space, but are less likely to run into the open file descriptor
limit or to block waiting for a free buffer.
Example::
Expand All @@ -97,9 +104,15 @@ def run(q):
"""

MAX_CHUNK_SIZE: int = 512 * 1024 * 1024
"""int: The maximum allowable size for a buffer chunk; also, the default size. 512MB should be a large enough
"""int: The maximum allowable size for a buffer chunk. 512MB should be a large enough
value."""

DEFAULT_CHUNK_SIZE: int = 1 * 1024 * 1024
"""int: The default size for a buffer chunk."""

DEFAULT_MAXSIZE: int = 2
"""int: The default maximum size for a queue."""

RESERVED_BLOCK_ID: int = 0xffffffff
"""int: RESERVED_BLOCK_ID is stored in the list head pointer and next chunk
block id fields to indicate that thee is no next block. This value is intended
Expand Down Expand Up @@ -154,8 +167,8 @@ def run(q):
be created and presented with the shared message queue's creator's pid.."""

def __init__(self,
chunk_size: int=1*1024*1024,
maxsize: int=2,
chunk_size: int=DEFAULT_CHUNK_SIZE,
maxsize: int=DEFAULT_MAXSIZE,
serializer=None,
integrity_check: bool=False,
deadlock_check: bool=False,
Expand All @@ -176,7 +189,8 @@ def __init__(self,

self.chunk_size: int = min(chunk_size, self.__class__.MAX_CHUNK_SIZE) \
if chunk_size > 0 else self.__class__.MAX_CHUNK_SIZE
self.maxsize: int = maxsize

self.maxsize: int = maxsize if maxsize > 0 else self.__class__.DEFAULT_MAXSIZE

self.serializer = serializer or pickle

Expand Down Expand Up @@ -609,16 +623,17 @@ def next_readable_msg(self, block: bool, timeout: typing.Optional[float]=None)->
def put(self, msg: typing.Any, block: bool=True, timeout: typing.Optional[float]=None):

"""
Put an object into a shaed memory queue.
Put an object into a shared memory queue.
Args:
msg (obj): The object which needs to put into queue.
msg (obj): The object which is to be put into queue.
block (bool, optional): If it is set to True (default), it will return after an item is put into queue.
timeout (int, optional): It can be any positive integer and only effective when `block` is set to True.
timeout (int, optional): A positive integer for the timeout duration in seconds, which is only effective when `block` is set to True.
Raises:
queue.Full: Raised if the call times out or the queue is full when `block` is False.
ValueError: An internal error occured in accessing the message's metadata.
ValueError: A request was made to send a message that, when serialized, exceeds the capacity of the queue.
PicklingError: This exception is raised when the serializer is pickle and
an error occured in serializing the message.
UnpicklingError: This exception is raised when the serializer is pickle and
Expand Down Expand Up @@ -753,10 +768,10 @@ def get(self, block: bool=True, timeout: typing.Optional[float]=None)->typing.An
Args:
block (bool, optional): If it is set to True (default), it will only return when an item is available.
timeout (int, optional): It can be any positive integer and only effective when `block` is set to True.
timeout (int, optional): A positive integer for the timeout duration in seconds, which is only effective when `block` is set to True.
Returns:
object: Object.
object: A message object retrieved from the queue.
Raises:
queue.Empty: This exception will be raised if it times out or queue is empty when `block` is False.
Expand Down

0 comments on commit 6ca5ba8

Please sign in to comment.