Skip to content

Commit

Permalink
Modify event_server state to store task stats directly.
Browse files Browse the repository at this point in the history
  • Loading branch information
pmundkur authored and srobertson committed Oct 7, 2012
1 parent fd089ba commit a0f611e
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 70 deletions.
26 changes: 11 additions & 15 deletions master/src/disco_web.erl
Expand Up @@ -64,8 +64,10 @@ getop("load_config_table", _Query) ->

getop("joblist", _Query) ->
{ok, Jobs} = event_server:get_jobs(),
{ok, [[1000000 * MSec + Sec, list_to_binary(atom_to_list(Status)), Name]
|| {Name, Status, {MSec, Sec, _USec}, _Pid}
{ok, [[1000000 * MSec + Sec,
list_to_binary(atom_to_list(Status)),
list_to_binary(Name)]
|| {Name, Status, {MSec, Sec, _USec}}
<- lists:reverse(lists:keysort(3, Jobs))]};

getop("jobinfo", {_Query, JobName}) ->
Expand Down Expand Up @@ -270,20 +272,14 @@ count_maps(L) ->

-spec render_jobinfo(event_server:job_eventinfo(), {[host()], [task_mode()]})
-> term().
render_jobinfo({Timestamp, Pid, JobInfo, Results, Ready, Failed},
render_jobinfo({Start, Status0, JobInfo, Results, Ready, Failed},
{Hosts, Modes}) ->
{NMapRun, NRedRun} = count_maps(Modes),
{NMapDone, NRedDone} = count_maps(Ready),
{NMapFail, NRedFail} = count_maps(Failed),

Status = case is_process_alive(Pid) of
true ->
<<"active">>;
false when Results == [] ->
<<"dead">>;
false ->
<<"ready">>
end,
NMapDone = dict:fetch(map, Ready),
NRedDone = dict:fetch(reduce, Ready),
NMapFail = dict:fetch(map, Failed),
NRedFail = dict:fetch(reduce, Failed),
Status = list_to_binary(atom_to_list(Status0)),

MapI = if
JobInfo#jobinfo.map ->
Expand All @@ -297,7 +293,7 @@ render_jobinfo({Timestamp, Pid, JobInfo, Results, Ready, Failed},
true -> 0
end,

{struct, [{timestamp, Timestamp},
{struct, [{timestamp, disco_util:format_timestamp(Start)},
{active, Status},
{mapi, [MapI, NMapRun, NMapDone, NMapFail]},
{redi, [RedI, NRedRun, NRedDone, NRedFail]},
Expand Down
136 changes: 83 additions & 53 deletions master/src/event_server.erl
Expand Up @@ -21,26 +21,27 @@
-define(EVENT_BUFFER_SIZE, 1000).
-define(EVENT_BUFFER_TIMEOUT, 2000).

-type event() :: {ready, [job_coordinator:input()]}
-type event() :: {job_data, jobinfo()}
| {task_ready, task_mode()}
| {task_failed, task_mode()}
| {map_ready, [job_coordinator:input()]}
| {reduce_ready, [job_coordinator:input()]}
| {job_data, jobinfo()}
| {task_ready, task_mode()}
| {task_failed, task_mode()}.
| {ready, [job_coordinator:input()]}.

-type task_msg() :: {binary(), term()} | string().

-type joblist_entry() :: {JobName :: binary(),
process_status(),
StartTime :: erlang:timestamp(),
JobCoordinator :: pid()}.
-type job_status() :: active | dead | ready.

-type job_eventinfo() :: {StartTime :: binary(),
JobCoordinator :: pid(),
jobinfo(),
-type joblist_entry() :: {JobName :: jobname(),
job_status(),
StartTime :: erlang:timestamp()}.

-type job_eventinfo() :: {StartTime :: erlang:timestamp(),
Status :: job_status(),
JobInfo :: none | jobinfo(),
Results :: [job_coordinator:input()],
Ready :: [task_mode()],
Failed :: [task_mode()]}.
Ready :: dict(),
Failed :: dict()}.

-export_type([event/0, task_msg/0, job_eventinfo/0]).

Expand Down Expand Up @@ -82,7 +83,7 @@ get_map_results(JobName) ->
gen_server:call(?MODULE, {get_map_results, JobName}).

-spec get_results(jobname()) -> invalid_job | {ready, pid(), [job_coordinator:input()]}
| {process_status(), pid()}.
| {job_status(), pid()}.
get_results(JobName) ->
gen_server:call(?MODULE, {get_results, JobName}).

Expand Down Expand Up @@ -159,7 +160,7 @@ handle_call({new_job, JobPrefix, Pid}, From, {Events0, MsgBuf0} = S) ->
invalid_prefix ->
{reply, {error, invalid_prefix}, S};
{ok, JobName} ->
Events = dict:store(JobName, {[], now(), Pid}, Events0),
Events = dict:store(JobName, new_job_ent(Pid), Events0),
MsgBuf = dict:store(JobName, {0, 0, []}, MsgBuf0),
spawn(fun() -> job_event_handler(JobName, From) end),
{noreply, {Events, MsgBuf}}
Expand All @@ -174,7 +175,8 @@ handle_call({get_map_results, JobName}, _From, {Events, _MsgBuf} = S) ->
{reply, do_get_map_results(JobName, Events), S};
handle_call({job_initialized, JobName, JobEventHandler}, _From, S) ->
ets:insert(event_files, {JobName, JobEventHandler}),
{reply, add_event("master", JobName, list_to_binary("\"New job initialized!\""), {}, S), S};
S1 = add_event("master", JobName, <<"\"New job initialized!\"">>, none, S),
{reply, ok, S1};
handle_call({get_jobinfo, JobName}, _From, {Events, _MsgBuf} = S) ->
{reply, do_get_jobinfo(JobName, Events), S}.

Expand Down Expand Up @@ -203,15 +205,60 @@ terminate(_Reason, _State) -> ok.
-spec code_change(term(), state(), term()) -> {ok, state()}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.

% State management.

-record(job_ent, {job_coord :: pid(),
start :: erlang:timestamp(),
job_data = none :: none | jobinfo(),
task_ready :: dict(), % task_mode() -> count
task_failed :: dict(), % task_mode() -> count
phase_results :: dict(), % task_mode() -> results
job_results = [] :: [job_coordinator:input()]}).
-type job_ent() :: #job_ent{}.

-spec new_job_ent(pid()) -> job_ent().
new_job_ent(JobCoord) ->
new_job_ent(JobCoord, [map, reduce]).
new_job_ent(JobCoord, Phases) ->
Counts = [{P, 0} || P <- Phases],
TaskReady = dict:from_list(Counts),
TaskFailed = dict:from_list(Counts),
PhaseResults = dict:new(),
#job_ent{job_coord = JobCoord,
start = now(),
task_ready = TaskReady,
task_failed = TaskFailed,
phase_results = PhaseResults}.

-spec update_job_ent(job_ent(), event()) -> job_ent().
update_job_ent(JE, {job_data, JobData}) ->
JE#job_ent{job_data = JobData};
update_job_ent(#job_ent{task_ready = TaskReady} = JE, {task_ready, Phase}) ->
JE#job_ent{task_ready = dict:update_counter(Phase, 1, TaskReady)};
update_job_ent(#job_ent{task_failed = TaskFailed} = JE, {task_failed, Phase}) ->
JE#job_ent{task_failed = dict:update_counter(Phase, 1, TaskFailed)};
update_job_ent(#job_ent{phase_results = PhaseResults} = JE, {map_ready, Res}) ->
JE#job_ent{phase_results = dict:store(map, Res, PhaseResults)};
update_job_ent(#job_ent{phase_results = PhaseResults} = JE, {reduce_ready, Res}) ->
JE#job_ent{phase_results = dict:store(reduce, Res, PhaseResults)};
update_job_ent(JE, {ready, Results}) ->
JE#job_ent{job_results = Results}.

-spec job_status(job_ent()) -> job_status().
job_status(#job_ent{job_coord = Pid, job_results = []}) ->
case is_process_alive(Pid) of
true -> active;
false -> dead
end;
job_status(_JE) ->
ready.

% Server implemention.

-spec do_get_jobs(dict()) -> {ok, [joblist_entry()]}.
do_get_jobs(Events) ->
Jobs = dict:fold(fun
(Name, {[{ready, _Results}|_], Start, Pid}, Acc) ->
[{list_to_binary(Name), ready, Start, Pid}|Acc];
(Name, {_Events, Start, Pid}, Acc) ->
[{list_to_binary(Name), process_status(Pid), Start, Pid}|Acc]
Jobs = dict:fold(fun (Name, #job_ent{start = Start} = JobEnt, Acc) ->
[{Name, job_status(JobEnt), Start}|Acc]
end, [], Events),
{ok, Jobs}.

Expand All @@ -231,16 +278,16 @@ do_get_job_msgs(JobName, Query, N0, MsgBuf) ->
end.

-spec do_get_results(jobname(), dict())
-> invalid_job | {process_status(), pid()}
-> invalid_job | {job_status(), pid()}
| {ready, pid(), [job_coordinator:input()]}.
do_get_results(JobName, Events) ->
case dict:find(JobName, Events) of
error ->
invalid_job;
{ok, {[{ready, Results}|_EventList], _JobStart, Pid}} ->
{ready, Pid, Results};
{ok, {_EventList, _JobStart, Pid}} ->
{process_status(Pid), Pid}
{ok, #job_ent{job_coord = Pid, job_results = Res}} when Res =/= [] ->
{ready, Pid, Res};
{ok, #job_ent{job_coord = Pid} = JE} ->
{job_status(JE), Pid}
end.

-spec do_get_map_results(jobname(), dict())
Expand All @@ -250,10 +297,10 @@ do_get_map_results(JobName, Events) ->
case dict:find(JobName, Events) of
error ->
invalid_job;
{ok, {EventList, _JobStart, _Pid}} ->
case event_filter(map_ready, EventList) of
[] -> not_ready;
[Res] -> {ok, Res}
{ok, #job_ent{phase_results = PR}} ->
case dict:find(map, PR) of
error -> not_ready;
{ok, _Res} = Ret -> Ret
end
end.

Expand All @@ -262,17 +309,9 @@ do_get_jobinfo(JobName, Events) ->
case dict:find(JobName, Events) of
error ->
invalid_job;
{ok, {EventList, JobStart, Pid}} ->
JobNfo =
case event_filter(job_data, EventList) of
[] -> [];
[N] -> N
end,
Results = event_filter(ready, EventList),
Ready = event_filter(task_ready, EventList),
Failed = event_filter(task_failed, EventList),
Start = disco_util:format_timestamp(JobStart),
{ok, {Start, Pid, JobNfo, Results, Ready, Failed}}
{ok, #job_ent{start = Start, job_data = JobNfo, job_results = Results,
task_ready = Ready, task_failed = Failed} = JE} ->
{ok, {Start, job_status(JE), JobNfo, Results, Ready, Failed}}
end.

-spec do_add_job_event(host(), jobname(), binary(), event(), state()) -> state().
Expand Down Expand Up @@ -307,8 +346,8 @@ add_event(Host0, JobName, Msg, Event, {Events, MsgBuf}) ->
Event =:= none ->
{Events, MsgBufN};
true ->
{ok, {EvLst0, Nu, Pid}} = dict:find(JobName, Events),
{dict:store(JobName, {[Event|EvLst0], Nu, Pid}, Events),
{ok, JE} = dict:find(JobName, Events),
{dict:store(JobName, update_job_ent(JE, Event), Events),
MsgBufN}
end.

Expand Down Expand Up @@ -361,13 +400,7 @@ flush_buffer(File, Buf) ->

% Misc utilities

-type process_status() :: active | dead | ready.
process_status(Pid) ->
case is_process_alive(Pid) of
true -> active;
false -> dead
end.

-spec json_list([binary()]) -> [binary()].
json_list(List) -> json_list(List, []).
json_list([], _) -> [];
json_list([X], L) ->
Expand All @@ -389,9 +422,6 @@ unique_key(Prefix, Dict) ->
end
end.

event_filter(Key, EventList) ->
[V || {K, V} <- EventList, K == Key].

task_format(Msg) when is_atom(Msg) or is_binary(Msg) or is_list(Msg) ->
"~s";
task_format(_Msg) ->
Expand Down
4 changes: 2 additions & 2 deletions master/src/temp_gc.erl
Expand Up @@ -25,7 +25,7 @@ loop(DataRoot) ->
case prim_file:list_dir(DataRoot) of
{ok, Dirs} ->
Active = gb_sets:from_list(
[Name || {Name, active, _Start, _Pid} <- Jobs]),
[Name || {Name, active, _Start} <- Jobs]),
process_dir(DataRoot, Dirs, gb_sets:from_ordset(Purged), Active);
E ->
% fresh install, try again after GC_INTERVAL
Expand Down Expand Up @@ -68,7 +68,7 @@ process_dir(DataRoot, [Dir|R], Purged, Active) ->

-spec ifdead(jobname(), gb_set()) -> boolean().
ifdead(Job, Active) ->
not gb_sets:is_member(list_to_binary(Job), Active).
not gb_sets:is_member(Job, Active).

% Perform purge in one function so that gen_server errors can be
% caught by callers.
Expand Down

0 comments on commit a0f611e

Please sign in to comment.