Skip to content

Commit

Permalink
update doc
Browse files Browse the repository at this point in the history
  • Loading branch information
GreatYYX committed Feb 15, 2020
1 parent 14547bc commit d841110
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions pyrallel/parallel_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def collector(data):
print(processed)
"""

import sys
import multiprocessing as mp
import threading
import queue
Expand Down Expand Up @@ -154,6 +153,7 @@ def run(self):

class ProgressThread(threading.Thread):
"""
Progress information in main process.
"""

P_ADDED = 0
Expand Down Expand Up @@ -202,6 +202,7 @@ class ParallelProcessor(Paralleller):
passed to `mapper` function. This has no effect for `Mapper` class.
It defaults to False.
batch_size (int, optional): Batch size, defaults to 1.
progress (Callable, optional): Progress inspection. Defaults to None.
Note:
Expand Down Expand Up @@ -344,6 +345,8 @@ def _update_progress(self, mapper, type_=None, finish=False):
if self.progress:
try:
if not finish:
# No need to ensure the status will be pulled from main process
# so if queue is full just skip this update
mapper._progress_info[type_] += 1
self.progress_queues[mapper._idx].put_nowait( (ParallelProcessor.CMD_DATA, mapper._progress_info) )
else:
Expand Down Expand Up @@ -376,6 +379,10 @@ def collect(self):
self.collector_queue_index = (self.collector_queue_index + 1) % len(self.collector_queues)

def get_progress(self):
"""
Get progress infomation from each mapper.
(main process)
"""
if not self.progress:
return

Expand All @@ -385,7 +392,7 @@ def get_progress(self):
if finish_num == self.num_of_processor:
return

# get next not None queue
# get next not-None queue
q = None
tmp_idx = idx
while not q:
Expand All @@ -394,7 +401,7 @@ def get_progress(self):
tmp_idx = (tmp_idx + 1) % self.num_of_processor

try:
data = q.get_nowait() # get out
data = q.get_nowait()
if data[0] == ParallelProcessor.CMD_STOP:
self.progress_queues[idx] = None # set to None if it's finished
finish_num += 1
Expand All @@ -405,4 +412,3 @@ def get_progress(self):
continue # find next available
finally:
idx = (idx + 1) % self.num_of_processor

0 comments on commit d841110

Please sign in to comment.