Skip to content

Commit

Permalink
Merge pull request #14 from CraigMiloRogers/master
Browse files Browse the repository at this point in the history
Protect against importing ShmQueue on Python versions before 3.8.
  • Loading branch information
CraigMiloRogers committed Nov 25, 2020
2 parents c5fd44d + fc37d9f commit 053014c
Show file tree
Hide file tree
Showing 3 changed files with 497 additions and 168 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,10 @@ docs:

release:
@VERSION=$$(python -c "from pyrallel.__version__ import __version__;print(__version__)") && git tag $$VERSION

# locate all the files in this directory or below:
FILES=`find . -name '*.py'`

# The command for running mypy:
lint:
python3 -m mypy $(FILES)
60 changes: 45 additions & 15 deletions pyrallel/parallel_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,14 @@ def collector(data):
import threading
import queue
import inspect
import sys
import typing
from typing import Callable, Iterable

from pyrallel import Paralleller, ShmQueue
from pyrallel import Paralleller

if sys.version_info >= (3, 8):
from pyrallel import ShmQueue

class Mapper(object):
"""
Expand Down Expand Up @@ -206,7 +209,16 @@ class ParallelProcessor(Paralleller):
It defaults to False.
batch_size (int, optional): Batch size, defaults to 1.
progress (Callable, optional): Progress inspection. Defaults to None.
use_shm (bool, optional): When True, and when riunning on Python version 3.8 or later,
use ShmQueue for higher performance. Defaults to False.
enable_collector_queues (bool, optional): When True, create a collector queue for each
processor. When False, do not allocate collector queues, saving
resources. Defaults to True.
single_mapper_queue (bool, optional): When True, allocate a single mapper queue that will
be shared between the worker processes. Sending processes can
go to sleep when the mapper queue is full. When False, each process
gets its own mapper queue, and CPU-intensive polling may be needed to
find a mapper queue which can accept a new request.
Note:
- Do NOT implement heavy compute-intensive operations in collector, they should be in mapper.
Expand All @@ -226,15 +238,22 @@ def __init__(self, num_of_processor: int, mapper: Callable, max_size_per_mapper_
single_mapper_queue: bool = False):
self.num_of_processor = num_of_processor
self.single_mapper_queue = single_mapper_queue
if sys.version_info >= (3, 8):
self.collector_queues: typing.Optional[typing.Union[ShmQueue, mp.Queue]]
else:
self.collector_queues: typing.Optional[mp.Queue]
if use_shm:
if single_mapper_queue:
self.mapper_queues = [ShmQueue(maxsize=max_size_per_mapper_queue * num_of_processor)]
else:
self.mapper_queues = [ShmQueue(maxsize=max_size_per_mapper_queue) for _ in range(num_of_processor)]
if enable_collector_queues:
self.collector_queues = [ShmQueue(maxsize=max_size_per_collector_queue) for _ in range(num_of_processor)]
if sys.version_info >= (3, 8):
if single_mapper_queue:
self.mapper_queues = [ShmQueue(maxsize=max_size_per_mapper_queue * num_of_processor)]
else:
self.mapper_queues = [ShmQueue(maxsize=max_size_per_mapper_queue) for _ in range(num_of_processor)]
if enable_collector_queues:
self.collector_queues = [ShmQueue(maxsize=max_size_per_collector_queue) for _ in range(num_of_processor)]
else:
self.collector_queues = None
else:
self.collector_queues = None
raise ValueError("shm not available in this version of Python.")
else:
if single_mapper_queue:
self.mapper_queues = [mp.Queue(maxsize=max_size_per_mapper_queue * num_of_processor)]
Expand All @@ -245,7 +264,7 @@ def __init__(self, num_of_processor: int, mapper: Callable, max_size_per_mapper_
else:
self.collector_queues = None

if enable_collector_queues:
if self.collector_queues is not None:
if single_mapper_queue:
self.processes = [mp.Process(target=self._run, args=(i, self.mapper_queues[0], self.collector_queues[i]))
for i in range(num_of_processor)]
Expand All @@ -260,8 +279,15 @@ def __init__(self, num_of_processor: int, mapper: Callable, max_size_per_mapper_
self.processes = [mp.Process(target=self._run, args=(i, self.mapper_queues[i], None))
for i in range(num_of_processor)]
if progress is not None:
if sys.version_info >= (3, 8):
self.progress_queues: typing.Optional[typing.Union[ShmQueue, mp.Queue]]
else:
self.progress_queues: typing.Optional[mp.Queue]
if use_shm:
self.progress_queues = [ShmQueue(maxsize=1) for _ in range(num_of_processor)]
if sys.version_info >= (3, 8):
self.progress_queues = [ShmQueue(maxsize=1) for _ in range(num_of_processor)]
else:
raise ValueError("shm not available in this version of Python.")
else:
self.progress_queues = [mp.Queue(maxsize=1) for _ in range(num_of_processor)]
else:
Expand Down Expand Up @@ -342,8 +368,12 @@ def task_done(self):

def add_task(self, *args, **kwargs):
"""
Add data to one of the mapper queues.
(main process, unblocked, using round robin to find next available queue)
Add data to one a mapper queue.
When a single mapper queue is in use, put the process to sleep if the
queue is full. When multiple mapper queues are in use (one per process),
use CPU-intensive polling (round-robin processing) to find the next available
queue. (main process, blocked or unblocked depending upon single_mapper_queue)
"""
self.batch_data.append((args, kwargs))
if self.progress:
Expand All @@ -368,7 +398,7 @@ def _add_task(self, batched_args):

def _run(self, idx: int, mapper_queue: mp.Queue, collector_queue: typing.Optional[mp.Queue]):
"""
Processs activity. It handles queue IO and invokes user's mapper handler.
Process's activity. It handles queue IO and invokes user's mapper handler.
(subprocess, blocked, only two queues can be used to communicate with main process)
"""
with self.mapper(idx) as mapper:
Expand All @@ -377,7 +407,7 @@ def _run(self, idx: int, mapper_queue: mp.Queue, collector_queue: typing.Optiona
if data[0] == ParallelProcessor.CMD_STOP:
# print(idx, 'stop')
self._update_progress(mapper, finish=True)
if self.collector:
if self.collector and collector_queue is not None:
collector_queue.put((ParallelProcessor.CMD_STOP,))
return
elif data[0] == ParallelProcessor.CMD_DATA:
Expand Down

0 comments on commit 053014c

Please sign in to comment.