diff --git a/src/map_worker.erl b/src/map_worker.erl index 3e89192..efe458e 100644 --- a/src/map_worker.erl +++ b/src/map_worker.erl @@ -50,11 +50,7 @@ run(MapFunction) -> error_logger:info_msg("Notifying master mapper ~p is done.", [self()]), MasterPid ! {self(), map_send_finished}, - % TODO: fault tolerance protocol extension implemntation. - receive - {_, map_reducing_complete} -> - error_logger:info_msg("Map-reducing finished. Quitting.", []) - end + additional_reduce_phase(ReducerPidsWithData) end end. @@ -63,6 +59,38 @@ run(MapFunction) -> %% Local Functions. %% +%% @doc Waits for 'map_reducing_complete' message, or additional recipe. +%% If receives additional recipe, start additional reduce phase. +%% See the documentation for protocol definition. +%% @spec (OldPartition) -> () where +%% OldPartition = dict() +%% @private +additional_reduce_phase(OldPartition) -> + receive + {_, map_reducing_complete} -> + error_logger:info_msg("Map-reducing finished. Quitting.", []); + + {MasterPid, {recipe, Recipe, DeadReducerPids}} -> + error_logger:info_msg("Received recipe; start additional reduce phase; splitting lost data..."), + + ReducerPidsWithData = split_lost_data_among_reducers(OldPartition, DeadReducerPids, Recipe), + + ReducerPids = dict:fetch_keys(ReducerPidsWithData), + + error_logger:info_msg("Sending data to reducers ~p...", + [ReducerPids]), + send_data_to_reducers(ReducerPids, ReducerPidsWithData), + + error_logger:info_msg("Collecting acknowledgements from " + "reducers ~p...", [ReducerPids]), + collect_acknowledgements(ReducerPids), + + error_logger:info_msg("Notifying master mapper ~p is done.", [self()]), + MasterPid ! {self(), map_send_finished}, + + additional_reduce_phase(ReducerPidsWithData) + end. + %% @doc Sends all data to reducers. %% @spec (ReducerPids, ReducerPidsWithData) -> void() where %% ReducerPids = [pid()], @@ -144,3 +172,26 @@ split_data_among_reducers(Data, Recipe) -> [{Key, Value}], ReducerPidWithData) end, dict:new(), Data). + +%% @doc Given a recipe, creates a mapping from reducer pid to a list with data +%% to be sent to it, which contains only data sent to DeadReducerPids in +%% previous iteration. +%% @spec (OldPartition, DeadReducerPids, Recipe) -> ReducerPidsWithData where +%% OldPartition = dict(), +%% DeadReducerPids = [pid()], +%% Recipe = K2 -> ReducerPid, +%% ReducerPidsWithData = dict() +%% @private +split_lost_data_among_reducers(OldPartition, DeadReducerPids, Recipe) -> + lists:foldl(fun (DeadReducerPid, ReducerPidWithData) -> + Data = dict:fetch(DeadReducerPid, OldPartition), + lists:foldl(fun ({Key, Value}, Accumulator) -> + DestinationReducerPid = Recipe(Key), + dict:update(DestinationReducerPid, + fun (OldData) -> + [{Key, Value} | OldData] + end, + [{Key, Value}], + Accumulator) + end, ReducerPidWithData, Data) + end, dict:new(), DeadReducerPids). diff --git a/src/master.erl b/src/master.erl index 4c67e7b..a69c1b9 100644 --- a/src/master.erl +++ b/src/master.erl @@ -15,7 +15,7 @@ execute_reduce_phase/1 % exported for testing purporses. ]). --record(master_state, {alive_reducer_pids, dead_reducer_pids = []}). +-record(master_state, {alive_mapper_pids, alive_reducer_pids, dead_reducer_pids = []}). %% %% API Functions. @@ -125,7 +125,8 @@ execute_map_phase(MapData, MapWorkerPids, ReduceWorkerPids, Recipe) -> spawn(monitors, monitor_reduce_workers, [self(), ReduceWorkerPids]), % TODO: create state in run function. - State = #master_state{alive_reducer_pids = ReduceWorkerPids}, + State = #master_state{alive_mapper_pids = MapWorkerPids, + alive_reducer_pids = ReduceWorkerPids}, % Collect map_send_finished messages. error_logger:info_msg("Collecting map_send_finished messages..."), @@ -185,16 +186,58 @@ execute_reduce_phase(State) -> collect_reduce_phase_results(State#master_state.alive_reducer_pids, [], State). -%% @doc Collects all results of reducing phase. When one of reduce wokres fails, -%% function splits his job between other reducers (fault tolerance). +execute_additional_reduce_phase(CollectedResults, DeadReducerPids, State) -> + error_logger:info_msg("Starting additional reduce phase; dead reduce workers : ~p", + [DeadReducerPids]), + + % Create a new recipe. + Recipe = conf:recipe(State#master_state.alive_reducer_pids), + + % Send recipe to mappers. + error_logger:info_msg("Sending the recipe to mappers ~p", + [State#master_state.alive_mapper_pids]), + lists:foreach(fun (MapperPid) -> + MapperPid ! {self(), {recipe, Recipe, DeadReducerPids}} + end, State#master_state.alive_mapper_pids), + + % Collect 'map_send_finished' messages + State2 = collect_map_send_finished(State#master_state.alive_mapper_pids, + State), + + % Send start signal to reduce workers + error_logger:info_msg("Sending start signal to reduce workers ~p", + [State2#master_state.alive_reducer_pids]), + lists:foreach(fun (ReducerPid) -> + error_logger:info_msg( + "Sending start signal to reduce worker ~p", + [ReducerPid]), + + ReducerPid ! {self(), start_reducing} + end, + State2#master_state.alive_reducer_pids), + + % Collect and return final results. + error_logger:info_msg("Collecting final results from reduce workers ~p", + [State2#master_state.alive_reducer_pids]), + + collect_reduce_phase_results(State2#master_state.alive_reducer_pids, CollectedResults, State2). + +%% @doc Collects all results of reduce phase. When one of reduce wokres fails, +%% function starts additional reduce phase. %% @spec (RemainingReducerPids, Accumulator, State) -> FinalResult where %% RemainingReducerPids = [pid()], %% Accumulator = [{K3,V3}], %% State = master_state, %% FinalResult = [{K3,V3}] %% @private -collect_reduce_phase_results([], Accumulator, _) -> - Accumulator; +collect_reduce_phase_results([], Accumulator, State) -> + case State#master_state.dead_reducer_pids of + [] -> + Accumulator; + DeadReducerPids -> + NewState = State#master_state{dead_reducer_pids = []}, + execute_additional_reduce_phase(Accumulator, DeadReducerPids, NewState) + end; collect_reduce_phase_results(RemainingReducerPids, Accumulator, State) -> receive {ReducerPid, {reduce_finished, ReduceResult}} -> @@ -210,8 +253,6 @@ collect_reduce_phase_results(RemainingReducerPids, Accumulator, State) -> "Reduce worker ~p is down", [ReducerPid]), - % TODO: fault tolerance - NewRemainingReducerPids = lists:delete(ReducerPid, RemainingReducerPids), NewState = reducer_failure_state_update(ReducerPid, State), @@ -228,6 +269,7 @@ reducer_failure_state_update(ReducerPid, State) -> DeadReducerPids = State#master_state.dead_reducer_pids, State#master_state{alive_reducer_pids = lists:delete(ReducerPid, AliveReducerPids), dead_reducer_pids = [ReducerPid | DeadReducerPids]}. + %% @doc Sends 'map_reducing_complete' message to all workers. %% @spec (MapWorkerPids, ReduceWorkerPids) -> () where %% MapWorkerPids = [pid()],