Skip to content

Commit

Permalink
fix context is none when reducer is exiting
Browse files Browse the repository at this point in the history
  • Loading branch information
GreatYYX committed Feb 26, 2019
1 parent f8b1fe7 commit df47a9f
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions pyrallel/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def reducer(r1, r2):
import sys
import logging

from pyrallel import Paralleller


logger = logging.getLogger('MapReduce')
logger.setLevel(logging.ERROR)
Expand All @@ -50,7 +52,7 @@ def reducer(r1, r2):
logger.addHandler(stdout_handler)


class MapReduce(object):
class MapReduce(Paralleller):
"""
Args:
num_of_process (int): Number of mappers and reducers.
Expand Down Expand Up @@ -274,13 +276,13 @@ def _run_reducer(self, idx):

# data
try:
if context is None: # can't use "not" operator here, context could be empty
if context is None: # can't use "not" operator here, context could be empty object (list, dict, ...)
context = self._reducer_queue.get(timeout=0.1)

m = self._reducer_queue.get(timeout=0.1)
context = self._reducer(context, m)
except queue.Empty:
# there are still some alive mapper, wait for their output
# there are still some alive mappers, wait for their output
if not no_running_mapper:
continue

Expand All @@ -292,6 +294,7 @@ def _run_reducer(self, idx):
continue
# kill itself, put context back to reducer queue
elif cmd[0] == self.__class__.CMD_REDUCER_KILL:
self._reducer_queue.put(context)
if context is not None:
self._reducer_queue.put(context)
self._manager_cmd_queue.put( (self.__class__.CMD_REDUCER_FINISH, idx) )
return

0 comments on commit df47a9f

Please sign in to comment.