Skip to content

Commit

Permalink
Full reducer fault tolerance.
Browse files Browse the repository at this point in the history
  • Loading branch information
poles-p committed Jan 19, 2011
1 parent 25b2e81 commit e80e4b3
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 13 deletions.
61 changes: 56 additions & 5 deletions src/map_worker.erl
Expand Up @@ -50,11 +50,7 @@ run(MapFunction) ->
error_logger:info_msg("Notifying master mapper ~p is done.", [self()]),
MasterPid ! {self(), map_send_finished},

% TODO: fault tolerance protocol extension implemntation.
receive
{_, map_reducing_complete} ->
error_logger:info_msg("Map-reducing finished. Quitting.", [])
end
additional_reduce_phase(ReducerPidsWithData)
end
end.

Expand All @@ -63,6 +59,38 @@ run(MapFunction) ->
%% Local Functions.
%%

%% @doc Waits for 'map_reducing_complete' message, or additional recipe.
%% If receives additional recipe, start additional reduce phase.
%% See the documentation for protocol definition.
%% @spec (OldPartition) -> () where
%% OldPartition = dict()
%% @private
additional_reduce_phase(OldPartition) ->
receive
{_, map_reducing_complete} ->
error_logger:info_msg("Map-reducing finished. Quitting.", []);

{MasterPid, {recipe, Recipe, DeadReducerPids}} ->
error_logger:info_msg("Received recipe; start additional reduce phase; splitting lost data..."),

ReducerPidsWithData = split_lost_data_among_reducers(OldPartition, DeadReducerPids, Recipe),

ReducerPids = dict:fetch_keys(ReducerPidsWithData),

error_logger:info_msg("Sending data to reducers ~p...",
[ReducerPids]),
send_data_to_reducers(ReducerPids, ReducerPidsWithData),

error_logger:info_msg("Collecting acknowledgements from "
"reducers ~p...", [ReducerPids]),
collect_acknowledgements(ReducerPids),

error_logger:info_msg("Notifying master mapper ~p is done.", [self()]),
MasterPid ! {self(), map_send_finished},

additional_reduce_phase(ReducerPidsWithData)
end.

%% @doc Sends all data to reducers.
%% @spec (ReducerPids, ReducerPidsWithData) -> void() where
%% ReducerPids = [pid()],
Expand Down Expand Up @@ -144,3 +172,26 @@ split_data_among_reducers(Data, Recipe) ->
[{Key, Value}],
ReducerPidWithData)
end, dict:new(), Data).

%% @doc Given a recipe, creates a mapping from reducer pid to a list with data
%% to be sent to it, which contains only data sent to DeadReducerPids in
%% previous iteration.
%% @spec (OldPartition, DeadReducerPids, Recipe) -> ReducerPidsWithData where
%% OldPartition = dict(),
%% DeadReducerPids = [pid()],
%% Recipe = K2 -> ReducerPid,
%% ReducerPidsWithData = dict()
%% @private
split_lost_data_among_reducers(OldPartition, DeadReducerPids, Recipe) ->
lists:foldl(fun (DeadReducerPid, ReducerPidWithData) ->
Data = dict:fetch(DeadReducerPid, OldPartition),
lists:foldl(fun ({Key, Value}, Accumulator) ->
DestinationReducerPid = Recipe(Key),
dict:update(DestinationReducerPid,
fun (OldData) ->
[{Key, Value} | OldData]
end,
[{Key, Value}],
Accumulator)
end, ReducerPidWithData, Data)
end, dict:new(), DeadReducerPids).
58 changes: 50 additions & 8 deletions src/master.erl
Expand Up @@ -15,7 +15,7 @@
execute_reduce_phase/1 % exported for testing purporses.
]).

-record(master_state, {alive_reducer_pids, dead_reducer_pids = []}).
-record(master_state, {alive_mapper_pids, alive_reducer_pids, dead_reducer_pids = []}).

%%
%% API Functions.
Expand Down Expand Up @@ -125,7 +125,8 @@ execute_map_phase(MapData, MapWorkerPids, ReduceWorkerPids, Recipe) ->
spawn(monitors, monitor_reduce_workers, [self(), ReduceWorkerPids]),

% TODO: create state in run function.
State = #master_state{alive_reducer_pids = ReduceWorkerPids},
State = #master_state{alive_mapper_pids = MapWorkerPids,
alive_reducer_pids = ReduceWorkerPids},

% Collect map_send_finished messages.
error_logger:info_msg("Collecting map_send_finished messages..."),
Expand Down Expand Up @@ -185,16 +186,58 @@ execute_reduce_phase(State) ->

collect_reduce_phase_results(State#master_state.alive_reducer_pids, [], State).

%% @doc Collects all results of reducing phase. When one of reduce wokres fails,
%% function splits his job between other reducers (fault tolerance).
execute_additional_reduce_phase(CollectedResults, DeadReducerPids, State) ->
error_logger:info_msg("Starting additional reduce phase; dead reduce workers : ~p",
[DeadReducerPids]),

% Create a new recipe.
Recipe = conf:recipe(State#master_state.alive_reducer_pids),

% Send recipe to mappers.
error_logger:info_msg("Sending the recipe to mappers ~p",
[State#master_state.alive_mapper_pids]),
lists:foreach(fun (MapperPid) ->
MapperPid ! {self(), {recipe, Recipe, DeadReducerPids}}
end, State#master_state.alive_mapper_pids),

% Collect 'map_send_finished' messages
State2 = collect_map_send_finished(State#master_state.alive_mapper_pids,
State),

% Send start signal to reduce workers
error_logger:info_msg("Sending start signal to reduce workers ~p",
[State2#master_state.alive_reducer_pids]),
lists:foreach(fun (ReducerPid) ->
error_logger:info_msg(
"Sending start signal to reduce worker ~p",
[ReducerPid]),

ReducerPid ! {self(), start_reducing}
end,
State2#master_state.alive_reducer_pids),

% Collect and return final results.
error_logger:info_msg("Collecting final results from reduce workers ~p",
[State2#master_state.alive_reducer_pids]),

collect_reduce_phase_results(State2#master_state.alive_reducer_pids, CollectedResults, State2).

%% @doc Collects all results of reduce phase. When one of reduce wokres fails,
%% function starts additional reduce phase.
%% @spec (RemainingReducerPids, Accumulator, State) -> FinalResult where
%% RemainingReducerPids = [pid()],
%% Accumulator = [{K3,V3}],
%% State = master_state,
%% FinalResult = [{K3,V3}]
%% @private
collect_reduce_phase_results([], Accumulator, _) ->
Accumulator;
collect_reduce_phase_results([], Accumulator, State) ->
case State#master_state.dead_reducer_pids of
[] ->
Accumulator;
DeadReducerPids ->
NewState = State#master_state{dead_reducer_pids = []},
execute_additional_reduce_phase(Accumulator, DeadReducerPids, NewState)
end;
collect_reduce_phase_results(RemainingReducerPids, Accumulator, State) ->
receive
{ReducerPid, {reduce_finished, ReduceResult}} ->
Expand All @@ -210,8 +253,6 @@ collect_reduce_phase_results(RemainingReducerPids, Accumulator, State) ->
"Reduce worker ~p is down",
[ReducerPid]),

% TODO: fault tolerance

NewRemainingReducerPids = lists:delete(ReducerPid, RemainingReducerPids),
NewState = reducer_failure_state_update(ReducerPid, State),

Expand All @@ -228,6 +269,7 @@ reducer_failure_state_update(ReducerPid, State) ->
DeadReducerPids = State#master_state.dead_reducer_pids,
State#master_state{alive_reducer_pids = lists:delete(ReducerPid, AliveReducerPids),
dead_reducer_pids = [ReducerPid | DeadReducerPids]}.

%% @doc Sends 'map_reducing_complete' message to all workers.
%% @spec (MapWorkerPids, ReduceWorkerPids) -> () where
%% MapWorkerPids = [pid()],
Expand Down

0 comments on commit e80e4b3

Please sign in to comment.