Skip to content

Commit

Permalink
Cleanup event_server api to avoid using direct gen_server calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
pmundkur committed May 29, 2012
1 parent 6fe38e1 commit c129a2c
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 36 deletions.
2 changes: 1 addition & 1 deletion master/src/disco_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ do_kill_job(JobName) ->
-spec do_clean_job(jobname()) -> ok.
do_clean_job(JobName) ->
do_kill_job(JobName),
gen_server:cast(event_server, {clean_job, JobName}),
event_server:clean_job(JobName),
ok.

% This is executed in its own process.
Expand Down
15 changes: 6 additions & 9 deletions master/src/disco_web.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ getop("load_config_table", _Query) ->
disco_config:get_config_table();

getop("joblist", _Query) ->
{ok, Jobs} = gen_server:call(event_server, get_jobs),
{ok, Jobs} = event_server:get_jobs(),
{ok, [[1000000 * MSec + Sec, list_to_binary(atom_to_list(Status)), Name]
|| {Name, Status, {MSec, Sec, _USec}, _Pid}
<- lists:reverse(lists:keysort(3, Jobs))]};

getop("jobinfo", {_Query, JobName}) ->
{ok, Active} = disco_server:get_active(JobName),
case gen_server:call(event_server, {get_jobinfo, JobName}) of
case event_server:get_jobinfo(JobName) of
{ok, JobInfo} ->
HostInfo = lists:unzip([{Host, M}
|| {Host, #task{mode = M}} <- Active]),
Expand All @@ -92,8 +92,7 @@ getop("jobevents", {Query, Name}) ->
false -> "";
{_, F} -> string:to_lower(F)
end,
{ok, Ev} = gen_server:call(event_server,
{get_job_events, Name, string:to_lower(Q), Num}),
{ok, Ev} = event_server:get_job_events(Name, string:to_lower(Q), Num),
{raw, Ev};

getop("nodeinfo", _Query) ->
Expand Down Expand Up @@ -147,7 +146,7 @@ getop("get_settings", _Query) ->
end, L))}};

getop("get_mapresults", {_Query, Name}) ->
case gen_server:call(event_server, {get_map_results, Name}) of
case event_server:get_map_results(Name) of
{ok, _Res} = OK ->
OK;
_ ->
Expand Down Expand Up @@ -191,7 +190,7 @@ postop("clean_job", Json) ->
end);

postop("get_results", Json) ->
Results = fun(N) -> gen_server:call(event_server, {get_results, N}) end,
Results = fun(N) -> event_server:get_results(N) end,
validate_payload("get_results", {array, [integer, {hom_array, string}]}, Json,
fun(J) ->
[Timeout, Names] = J,
Expand Down Expand Up @@ -322,7 +321,5 @@ wait_jobs(Jobs, Timeout) ->
receive {'DOWN', _, _, _, _} -> ok
after Timeout -> ok
end,
[{N, gen_server:call(event_server,
{get_results, binary_to_list(N)})}
|| {N, _} <- Jobs]
[{N, event_server:get_results(binary_to_list(N))} || {N, _} <- Jobs]
end.
80 changes: 59 additions & 21 deletions master/src/event_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@
-define(EVENT_BUFFER_SIZE, 1000).
-define(EVENT_BUFFER_TIMEOUT, 2000).

-export([new_job/2,
end_job/1,
event/4,
event/5,
event/6,
task_event/2,
task_event/3,
task_event/4,
task_event/5]).
-export([new_job/2, end_job/1, clean_job/1,
get_jobs/0, get_jobs/1, get_jobinfo/1, get_job_events/3,
get_map_results/1, get_results/1,
event/4, event/5, event/6,
task_event/2, task_event/3, task_event/4, task_event/5]).
-export([start_link/0,
init/1,
handle_call/3,
Expand All @@ -34,6 +30,47 @@ new_job(Prefix, JobCoordinator) ->
end_job(JobName) ->
gen_server:cast(?MODULE, {job_done, JobName}).

-type joblist_entry() :: {JobName :: binary(),
process_status(),
StartTime :: erlang:timestamp(),
JobCoordinator :: pid()}.

-spec get_jobs() -> {ok, [joblist_entry()]}.
get_jobs() ->
gen_server:call(?MODULE, get_jobs).

-spec get_jobs(Master :: node()) -> {ok, [joblist_entry()]}.
get_jobs(Master) ->
gen_server:call({?MODULE, Master}, get_jobs).

-type job_eventinfo() :: {StartTime :: erlang:timestamp(),
JobCoordinator :: pid(),
jobinfo(),
Results :: [job_coordinator:input()],
Ready :: string(),
Failed :: string()}.
-spec get_jobinfo(jobname()) -> invalid_job | {ok, job_eventinfo()}.
get_jobinfo(JobName) ->
gen_server:call(?MODULE, {get_jobinfo, JobName}).

-spec get_job_events(jobname(), string(), integer()) -> {ok, [binary()]}.
get_job_events(JobName, Q, N) ->
gen_server:call(?MODULE, {get_job_events, JobName, string:to_lower(Q), N}).

-spec get_map_results(jobname()) -> invalid_job | not_ready
| {ok, [job_coordinator:input()]}.
get_map_results(JobName) ->
gen_server:call(?MODULE, {get_map_results, JobName}).

-spec get_results(jobname()) -> invalid_job | {ready, pid(), [job_coordinator:input()]}
| {process_status(), pid()}.
get_results(JobName) ->
gen_server:call(event_server, {get_results, JobName}).

-spec clean_job(jobname()) -> ok.
clean_job(JobName) ->
gen_server:cast(?MODULE, {clean_job, JobName}).

-spec start_link() -> {ok, pid()}.
start_link() ->
lager:info("Event server starts"),
Expand All @@ -54,12 +91,12 @@ start_link() ->
% msgbuf dict: jobname -> { <nmsgs>, <list-length>, [<msg>] }
%

-type state() :: {dict(), dict()}.

-spec init(_) -> gs_init().
init(_Args) ->
_ = ets:new(event_files, [named_table]),
{ok, {dict:new(), dict:new()}}.
-type process_status() :: active | dead | ready.
process_status(Pid) ->
case is_process_alive(Pid) of
true -> active;
false -> dead
end.

json_list(List) -> json_list(List, []).
json_list([], _) -> [];
Expand All @@ -84,6 +121,13 @@ unique_key(Prefix, Dict) ->
end
end.

-type state() :: {dict(), dict()}.

-spec init(_) -> gs_init().
init(_Args) ->
_ = ets:new(event_files, [named_table]),
{ok, {dict:new(), dict:new()}}.

-spec handle_call(term(), from(), state()) -> gs_reply(term()) | gs_noreply().

handle_call(get_jobs, _From, {Events, _MsgBuf} = S) ->
Expand Down Expand Up @@ -219,12 +263,6 @@ grep_log(JobName, Query, N) ->
" 2>/dev/null | head -n ", integer_to_list(N)]), "\n"),
[list_to_binary(L) || L <- lists:reverse(Lines)].

process_status(Pid) ->
case is_process_alive(Pid) of
true -> active;
false -> dead
end.

event_filter(Key, EventList) ->
[V || {K, V} <- EventList, K == Key].

Expand Down
4 changes: 3 additions & 1 deletion master/src/job_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
-type input() :: binary() | [binary()].
-type phase_input() :: {non_neg_integer(), [{input(), host()}]}.

-export_type([input/0]).

% In theory we could keep the HTTP connection pending until the job
% finishes but in practice long-living HTTP connections are a bad idea.
% Thus, the HTTP request spawns a new process, job_coordinator, that
Expand Down Expand Up @@ -78,7 +80,7 @@ job_coordinator(#jobinfo{jobname = JobName} = Job) ->
kill_job(JobName, {EventFormat, Args, Params} = Error) ->
job_event(JobName, {"ERROR: " ++ EventFormat, Args, Params}),
disco_server:kill_job(JobName, 30000),
gen_server:cast(event_server, {job_done, JobName}),
event_server:end_job(JobName),
exit(Error);
kill_job(JobName, {EventFormat, Args}) ->
kill_job(JobName, {EventFormat, Args, {}}).
Expand Down
5 changes: 1 addition & 4 deletions master/src/temp_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ start_link(Master, DataRoot) ->

-spec loop(path()) -> no_return().
loop(DataRoot) ->
case catch {get_purged(), get_jobs()} of
case catch {get_purged(), event_server:get_jobs(get(master))} of
{{ok, Purged}, {ok, Jobs}} ->
case prim_file:list_dir(DataRoot) of
{ok, Dirs} ->
Expand Down Expand Up @@ -57,9 +57,6 @@ ddfs_delete(Tag) ->
get_purged() ->
disco_server:get_purged(get(master)).

get_jobs() ->
gen_server:call({event_server, get(master)}, get_jobs).

-spec process_dir(path(), [path()], gb_set(), gb_set()) -> ok.
process_dir(_DataRoot, [], _Purged, _Active) -> ok;
process_dir(DataRoot, [Dir|R], Purged, Active) ->
Expand Down

0 comments on commit c129a2c

Please sign in to comment.