Skip to content

Commit

Permalink
Merge pull request #11 from CraigMiloRogers/master
Browse files Browse the repository at this point in the history
Resolved Problems
  • Loading branch information
CraigMiloRogers committed Sep 9, 2020
2 parents 4cb3581 + e7dedf6 commit eddcd0a
Show file tree
Hide file tree
Showing 2 changed files with 469 additions and 117 deletions.
55 changes: 43 additions & 12 deletions pyrallel/parallel_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ def collector(data):
import threading
import queue
import inspect
import typing
from typing import Callable, Iterable

from pyrallel import Paralleller
from pyrallel import Paralleller, ShmQueue


class Mapper(object):
Expand Down Expand Up @@ -221,13 +222,34 @@ class ParallelProcessor(Paralleller):

def __init__(self, num_of_processor: int, mapper: Callable, max_size_per_mapper_queue: int = 0,
collector: Callable = None, max_size_per_collector_queue: int = 0,
enable_process_id: bool = False, batch_size: int = 1, progress=None):
enable_process_id: bool = False, batch_size: int = 1, progress=None, use_shm=False, enable_collector_queues=True):
self.num_of_processor = num_of_processor
self.mapper_queues = [mp.Queue(maxsize=max_size_per_mapper_queue) for _ in range(num_of_processor)]
self.collector_queues = [mp.Queue(maxsize=max_size_per_collector_queue) for _ in range(num_of_processor)]
self.processes = [mp.Process(target=self._run, args=(i, self.mapper_queues[i], self.collector_queues[i]))
for i in range(num_of_processor)]
self.progress_queues = [mp.Queue(maxsize=1) for _ in range(num_of_processor)]
if use_shm:
self.mapper_queues = [ShmQueue(maxsize=max_size_per_mapper_queue) for _ in range(num_of_processor)]
if enable_collector_queues:
self.collector_queues = [ShmQueue(maxsize=max_size_per_collector_queue) for _ in range(num_of_processor)]
else:
self.collector_queues = None
else:
self.mapper_queues = [mp.Queue(maxsize=max_size_per_mapper_queue) for _ in range(num_of_processor)]
if enable_collector_queues:
self.collector_queues = [mp.Queue(maxsize=max_size_per_collector_queue) for _ in range(num_of_processor)]
else:
self.collector_queues = None

if enable_collector_queues:
self.processes = [mp.Process(target=self._run, args=(i, self.mapper_queues[i], self.collector_queues[i]))
for i in range(num_of_processor)]
else:
self.processes = [mp.Process(target=self._run, args=(i, self.mapper_queues[i], None))
for i in range(num_of_processor)]
if progress is not None:
if use_shm:
self.progress_queues = [ShmQueue(maxsize=1) for _ in range(num_of_processor)]
else:
self.progress_queues = [mp.Queue(maxsize=1) for _ in range(num_of_processor)]
else:
self.progress_queues = None
self.progress = progress

ctx = self
Expand Down Expand Up @@ -278,6 +300,14 @@ def join(self):
self.progress_thread.join()
for p in self.processes:
p.join()
for q in self.mapper_queues:
q.close()
if self.collector_queues is not None:
for q in self.collector_queues:
q.close()
if self.progress_queues is not None:
for q in self.progress_queues:
q.close()

def task_done(self):
"""
Expand Down Expand Up @@ -314,7 +344,7 @@ def _add_task(self, batched_args):
except queue.Full:
continue # find next available

def _run(self, idx: int, mapper_queue: mp.Queue, collector_queue: mp.Queue):
def _run(self, idx: int, mapper_queue: mp.Queue, collector_queue: typing.Optional[mp.Queue]):
"""
Process’s activity. It handles queue IO and invokes user's mapper handler.
(subprocess, blocked, only two queues can be used to communicate with main process)
Expand All @@ -336,11 +366,12 @@ def _run(self, idx: int, mapper_queue: mp.Queue, collector_queue: mp.Queue):
self._update_progress(mapper, type_=ProgressThread.P_LOADED)
result = mapper.process(*args, **kwargs)
self._update_progress(mapper, type_=ProgressThread.P_PROCESSED)
if self.collector:
if not isinstance(result, tuple): # collector must represent as tuple
result = (result,)
if collector_queue is not None:
if self.collector:
if not isinstance(result, tuple): # collector must represent as tuple
result = (result,)
batch_result.append(result)
if len(batch_result) > 0:
if collector_queue is not None and len(batch_result) > 0:
collector_queue.put((ParallelProcessor.CMD_DATA, batch_result))
batch_result = [] # reset buffer

Expand Down

0 comments on commit eddcd0a

Please sign in to comment.