Skip to content

Commit

Permalink
Tidy code a bit.
Browse files Browse the repository at this point in the history
  • Loading branch information
pmundkur authored and srobertson committed Oct 7, 2012
1 parent 7c69510 commit 9fcbeb1
Showing 1 changed file with 51 additions and 37 deletions.
88 changes: 51 additions & 37 deletions master/src/disco_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -241,29 +241,31 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
%% ===================================================================
%% exit handlers

process_exit(Pid, {Type, _} = Results, S) ->
case gb_trees:lookup(Pid, S#state.workers) of
process_exit(Pid, {Type, _} = Results, #state{workers = Workers,
nodes = Nodes} = S) ->
case gb_trees:lookup(Pid, Workers) of
none ->
nodemon_exit(Pid, S);
{_, {Host, Task}} ->
UWorkers = gb_trees:delete(Pid, S#state.workers),
UWorkers = gb_trees:delete(Pid, Workers),
Task#task.from ! {Results, Task, Host},
schedule_next(),
{noreply, update_stats(Host,
gb_trees:lookup(Host, S#state.nodes),
gb_trees:lookup(Host, Nodes),
Type,
S#state{workers = UWorkers})}
end.

-spec nodemon_exit(pid(), state()) -> gs_noreply().
nodemon_exit(Pid, S) ->
Iter = gb_trees:iterator(S#state.nodes),
nodemon_exit(Pid, #state{nodes = Nodes} = S) ->
Iter = gb_trees:iterator(Nodes),
nodemon_exit(Pid, S, gb_trees:next(Iter)).

nodemon_exit(Pid, S, {Host, #dnode{node_mon = Pid} = N, _Iter}) ->
nodemon_exit(Pid, #state{nodes = Nodes} = S,
{Host, #dnode{node_mon = Pid} = N, _Iter}) ->
lager:warning("Restarting monitor for ~p", [Host]),
N1 = N#dnode{node_mon = node_mon:start_link(Host)},
S1 = S#state{nodes = gb_trees:update(Host, N1, S#state.nodes)},
S1 = S#state{nodes = gb_trees:update(Host, N1, Nodes)},
{noreply, do_connection_status(Host, down, S1)};

nodemon_exit(Pid, S, {_Host, _N, Iter}) ->
Expand Down Expand Up @@ -305,19 +307,23 @@ update_nodes(Nodes) ->
-spec update_stats(host_name(), none | {value, dnode()}, _,
state()) -> state().
update_stats(_Node, none, _ReplyType, S) -> S;
update_stats(Node, {value, N}, ReplyType, S) ->
M = N#dnode{num_running = N#dnode.num_running - 1},
update_stats(Node, {value, #dnode{num_running = NumRunning,
stats_ok = StatsOk,
stats_failed = StatsFailed,
stats_crashed = StatsCrashed} = N},
ReplyType, #state{nodes = Nodes} = S) ->
M = N#dnode{num_running = NumRunning - 1},
M0 = case ReplyType of
done ->
M#dnode{stats_ok = M#dnode.stats_ok + 1};
M#dnode{stats_ok = StatsOk + 1};
error ->
M#dnode{stats_failed = M#dnode.stats_failed + 1};
M#dnode{stats_failed = StatsFailed + 1};
fatal ->
M#dnode{stats_crashed = M#dnode.stats_crashed + 1};
M#dnode{stats_crashed = StatsCrashed + 1};
_ ->
M#dnode{stats_crashed = M#dnode.stats_crashed + 1}
M#dnode{stats_crashed = StatsCrashed + 1}
end,
S#state{nodes = gb_trees:update(Node, M0, S#state.nodes)}.
S#state{nodes = gb_trees:update(Node, M0, Nodes)}.

-spec do_connection_status(host_name(), up | down, state()) -> state().
do_connection_status(Node, Status, #state{nodes = Nodes} = S) ->
Expand Down Expand Up @@ -348,12 +354,13 @@ do_manual_blacklist(Node, True, #state{nodes = Nodes} = S) ->

-spec do_update_config_table([disco_config:host_info()], [host_name()],
[host_name()], state()) -> state().
do_update_config_table(Config, Blacklist, GCBlacklist, S) ->
do_update_config_table(Config, Blacklist, GCBlacklist,
#state{nodes = Nodes} = S) ->
lager:info("Config table updated"),
NewNodes =
lists:foldl(fun({Host, Slots}, NewNodes) ->
NewNode =
case gb_trees:lookup(Host, S#state.nodes) of
case gb_trees:lookup(Host, Nodes) of
none ->
#dnode{host = Host,
node_mon = node_mon:start_link(Host),
Expand All @@ -371,14 +378,14 @@ do_update_config_table(Config, Blacklist, GCBlacklist, S) ->
gb_trees:insert(Host, NewNode, NewNodes)
end, gb_trees:empty(), Config),
lists:foreach(
fun(OldNode) ->
case gb_trees:lookup(OldNode#dnode.host, NewNodes) of
none ->
unlink(OldNode#dnode.node_mon),
exit(OldNode#dnode.node_mon, kill);
_ -> ok
end
end, gb_trees:values(S#state.nodes)),
fun(#dnode{host = Host, node_mon = NodeMon}) ->
case gb_trees:lookup(Host, NewNodes) of
none ->
unlink(NodeMon),
exit(NodeMon, kill);
_ -> ok
end
end, gb_trees:values(Nodes)),
disco_proxy:update_nodes(gb_trees:keys(NewNodes)),
update_nodes(NewNodes),
S1 = do_gc_blacklist(GCBlacklist, S),
Expand Down Expand Up @@ -443,8 +450,8 @@ do_new_job(JobName, JobCoord, _S) ->
catch gen_server:call(scheduler, {new_job, JobName, JobCoord}).

-spec do_new_task(task(), state()) -> ok | failed.
do_new_task(Task, S) ->
NodeStats = [case gb_trees:lookup(Node, S#state.nodes) of
do_new_task(Task, #state{nodes = Nodes}) ->
NodeStats = [case gb_trees:lookup(Node, Nodes) of
none -> {false, Input};
{value, N} -> {N#dnode.num_running, Input}
end || {_Url, Node} = Input <- Task#task.input],
Expand All @@ -467,15 +474,22 @@ do_get_active(JobName, #state{workers = Workers}) ->

-spec do_get_nodeinfo(state()) -> {ok, [nodeinfo()]}.
do_get_nodeinfo(#state{nodes = Nodes}) ->
Info = [#nodeinfo{name = N#dnode.host,
slots = N#dnode.slots,
num_running = N#dnode.num_running,
stats_ok = N#dnode.stats_ok,
stats_failed = N#dnode.stats_failed,
stats_crashed = N#dnode.stats_crashed,
connected = N#dnode.connection_status =:= up,
blacklisted = N#dnode.manual_blacklist}
|| N <- gb_trees:values(Nodes)],
Info = [#nodeinfo{name = Host,
slots = Slots,
num_running = NumRunning,
stats_ok = StatsOk,
stats_failed = StatsFailed,
stats_crashed = StatsCrashed,
connected = ConnectionStatus =:= up,
blacklisted = Blacklisted}
|| #dnode{host = Host,
slots = Slots,
num_running = NumRunning,
stats_ok = StatsOk,
stats_failed = StatsFailed,
stats_crashed = StatsCrashed,
connection_status = ConnectionStatus,
manual_blacklist = Blacklisted} <- gb_trees:values(Nodes)],
{ok, Info}.

-spec do_get_purged(state()) -> {{ok, [binary()]}, state()}.
Expand All @@ -489,7 +503,7 @@ do_get_purged(#state{purged = Purged} = S) ->

-spec do_get_num_cores(state()) -> {ok, cores()}.
do_get_num_cores(#state{nodes = Nodes}) ->
{ok, lists:sum([N#dnode.slots || N <- gb_trees:values(Nodes)])}.
{ok, lists:sum([Slots || #dnode{slots = Slots} <- gb_trees:values(Nodes)])}.

-spec do_kill_job(jobname()) -> ok.
do_kill_job(JobName) ->
Expand Down

0 comments on commit 9fcbeb1

Please sign in to comment.