Skip to content

Commit

Permalink
add doc for parallel processor
Browse files Browse the repository at this point in the history
  • Loading branch information
GreatYYX committed Aug 17, 2018
1 parent 9c686ff commit 2edd0e4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 15 deletions.
3 changes: 2 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#
import os
import sys
import datetime
sys.path.insert(0, os.path.abspath('../rltk'))
sys.path.insert(0, os.path.abspath('../'))

Expand Down Expand Up @@ -47,7 +48,7 @@

# General information about the project.
project = u'RLTK'
copyright = u'2017, USC/ISI'
copyright = u'{}, USC/ISI'.format(datetime.datetime.now().year)
author = u'USC/ISI'

# The version info for the project you're documenting, acts as replacement for
Expand Down
5 changes: 5 additions & 0 deletions docs/mod_parallel_processor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ParallelProcessor
=================

.. automodule:: rltk.parallel_processor
:members:
1 change: 1 addition & 0 deletions docs/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ API Reference
:maxdepth: 4

mod_dataset.rst
mod_parallel_processor.rst
mod_record.rst
mod_similarity.rst
.. mod_io.rst
Expand Down
29 changes: 15 additions & 14 deletions rltk/parallel_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ def run(self):


class ParallelProcessor(object):
"""
Args:
input_handler (Callable): Computational function.
num_of_processor (int): Number of processes to use.
max_size_per_input_queue (int): Maximum size of input queue for one process.
If it's full, the corresponding process will be blocked.
0 by default means unlimited.
max_size_per_output_queue (int): Maximum size of output queue for one process.
If it's full, the corresponding process will be blocked.
0 by default means unlimited.
output_handler (Callable): If the output data needs to be get in main process (another thread),
set this handler, the arguments are same to the return from input_handler.
The return result is one by one, order is arbitrary.
"""

# Command format in queue. Represent in tuple.
# The first element of tuple will be command, the rests are arguments or data.
# (CMD_XXX, args...)
Expand All @@ -29,20 +44,6 @@ class ParallelProcessor(object):
def __init__(self, input_handler: Callable, num_of_processor: int,
max_size_per_input_queue: int = 0, max_size_per_output_queue: int = 0,
output_handler: Callable = None):
"""
Args:
input_handler (Callable): Computational function.
num_of_processor (int): Number of processes to use.
max_size_per_input_queue (int): Maximum size of input queue for one process.
If it's full, the corresponding process will be blocked.
0 by default means unlimited.
max_size_per_output_queue (int): Maximum size of output queue for one process.
If it's full, the corresponding process will be blocked.
0 by default means unlimited.
output_handler (Callable): If the output data needs to be get in main process (another thread),
set this handler, the arguments are same to the return from input_handler.
The return result is one by one, order is arbitrary.
"""
self.num_of_processor = num_of_processor
self.input_queues = [mp.Queue(maxsize=max_size_per_input_queue) for _ in range(num_of_processor)]
self.output_queues = [mp.Queue(maxsize=max_size_per_output_queue) for _ in range(num_of_processor)]
Expand Down

0 comments on commit 2edd0e4

Please sign in to comment.