Skip to content

Commit

Permalink
os_process_limit for query servers make them much more robust under c…
Browse files Browse the repository at this point in the history
…oncurrent load

git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@983291 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jchris committed Aug 7, 2010
1 parent ccaa103 commit e18c591
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 66 deletions.
1 change: 1 addition & 0 deletions etc/couchdb/default.ini.tpl.in
Expand Up @@ -46,6 +46,7 @@ javascript = %bindir%/%couchjs_command_name% %localbuilddatadir%/server/main.js
; please let us know on the mailing list so we can fine tune the heuristic.
[query_server_config]
reduce_limit = true
os_process_limit = 25

; enable external as an httpd handler, then link it with commands here.
; note, this api is still under consideration.
Expand Down
224 changes: 158 additions & 66 deletions src/couchdb/couch_query_servers.erl
Expand Up @@ -35,6 +35,15 @@
stop_fun
}).

-record(qserver, {
langs, % Keyed by language name, value is {Mod,Func,Arg}
pid_procs, % Keyed by PID, valus is a #proc record.
lang_procs, % Keyed by language name, value is a #proc record
lang_limits, % Keyed by language name, value is {Lang, Limit, Current}
waitlist = [],
config
}).

start_link() ->
gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).

Expand Down Expand Up @@ -242,81 +251,97 @@ init([]) ->
supervisor:terminate_child(couch_secondary_services, query_servers),
[supervisor:restart_child(couch_secondary_services, query_servers)]
end),
ok = couch_config:register(
fun("query_server_config" ++ _, _) ->
supervisor:terminate_child(couch_secondary_services, query_servers),
supervisor:restart_child(couch_secondary_services, query_servers)
end),

Langs = ets:new(couch_query_server_langs, [set, private]),
LangLimits = ets:new(couch_query_server_lang_limits, [set, private]),
PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
LangProcs = ets:new(couch_query_server_procs, [set, private]),

ProcTimeout = list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000")),
ReduceLimit = list_to_atom(
couch_config:get("query_server_config","reduce_limit","true")),
OsProcLimit = list_to_integer(
couch_config:get("query_server_config","os_process_limit","10")),

% 'query_servers' specifies an OS command-line to execute.
lists:foreach(fun({Lang, Command}) ->
true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}),
true = ets:insert(Langs, {?l2b(Lang),
couch_os_process, start_link, [Command]})
end, couch_config:get("query_servers")),
% 'native_query_servers' specifies a {Module, Func, Arg} tuple.
lists:foreach(fun({Lang, SpecStr}) ->
{ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit
true = ets:insert(Langs, {?l2b(Lang),
Mod, Fun, SpecArg})
end, couch_config:get("native_query_servers")),
process_flag(trap_exit, true),
{ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg}
PidProcs, % Keyed by PID, valus is a #proc record.
LangProcs % Keyed by language name, value is a #proc record
}}.

terminate(_Reason, {_Langs, PidProcs, _LangProcs}) ->

process_flag(trap_exit, true),
{ok, #qserver{
langs = Langs, % Keyed by language name, value is {Mod,Func,Arg}
pid_procs = PidProcs, % Keyed by PID, valus is a #proc record.
lang_procs = LangProcs, % Keyed by language name, value is a #proc record
lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current}
config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>, ProcTimeout}]}
}}.

terminate(_Reason, #qserver{pid_procs=PidProcs}) ->
[couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],
ok.

handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) ->
% Note to future self. Add max process limit.
handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, Server) ->
Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
case ets:lookup(LangProcs, Lang) of
[{Lang, [P|Rest]}] ->
% find a proc in the set that has the DDoc
{ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]),
rem_from_list(LangProcs, Lang, Proc),
{reply, {ok, Proc, get_query_server_config()}, Server};
_ ->
case (catch new_process(Langs, Lang)) of
{ok, Proc} ->
add_value(PidProcs, Proc#proc.pid, Proc),
{ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]),
{reply, {ok, Proc2, get_query_server_config()}, Server};
Error ->
{reply, Error, Server}
end
case lang_proc(Lang, Server, fun(Procs) ->
% find a proc in the set that has the DDoc
proc_with_ddoc(DDoc, DDocKey, Procs)
end) of
{ok, Proc} ->
{reply, {ok, Proc, Server#qserver.config}, Server};
wait ->
{noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)};
Error ->
{reply, Error, Server}
end;
handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) ->
% Note to future self. Add max process limit.
case ets:lookup(LangProcs, Lang) of
[{Lang, [Proc|_]}] ->
rem_from_list(LangProcs, Lang, Proc),
{reply, {ok, Proc, get_query_server_config()}, Server};
_ ->
case (catch new_process(Langs, Lang)) of
{ok, Proc} ->
add_value(PidProcs, Proc#proc.pid, Proc),
{reply, {ok, Proc, get_query_server_config()}, Server};
Error ->
{reply, Error, Server}
end
handle_call({get_proc, Lang}, From, Server) ->
case lang_proc(Lang, Server, fun([P|_Procs]) ->
{ok, P}
end) of
{ok, Proc} ->
{reply, {ok, Proc, Server#qserver.config}, Server};
wait ->
{noreply, add_to_waitlist({Lang}, From, Server)};
Error ->
{reply, Error, Server}
end;
handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) ->
handle_call({unlink_proc, Pid}, _From, #qserver{pid_procs=PidProcs}=Server) ->
rem_value(PidProcs, Pid),
unlink(Pid),
{reply, ok, Server};
handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) ->
handle_call({ret_proc, Proc}, _From, #qserver{
pid_procs=PidProcs,
lang_procs=LangProcs}=Server) ->
% Along with max process limit, here we should check
% if we're over the limit and discard when we are.
add_value(PidProcs, Proc#proc.pid, Proc),
add_to_list(LangProcs, Proc#proc.lang, Proc),
link(Proc#proc.pid),
{reply, true, Server}.
{reply, true, service_waitlist(Server)}.

handle_cast(_Whatever, Server) ->
{noreply, Server}.

handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
handle_info({'EXIT', Pid, Status}, #qserver{
pid_procs=PidProcs,
lang_procs=LangProcs,
lang_limits=LangLimits}=Server) ->
case ets:lookup(PidProcs, Pid) of
[{Pid, Proc}] ->
case Status of
Expand All @@ -325,7 +350,9 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
end,
rem_value(PidProcs, Pid),
catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
{noreply, Server};
[{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang),
true = ets:insert(LangLimits, {Lang, Lim, Current-1}),
{noreply, service_waitlist(Server)};
[] ->
case Status of
normal ->
Expand All @@ -340,23 +367,90 @@ code_change(_OldVsn, State, _Extra) ->

% Private API

get_query_server_config() ->
ReduceLimit = list_to_atom(
couch_config:get("query_server_config","reduce_limit","true")),
{[{<<"reduce_limit">>, ReduceLimit}]}.

new_process(Langs, Lang) ->
case ets:lookup(Langs, Lang) of
[{Lang, Mod, Func, Arg}] ->
{ok, Pid} = apply(Mod, Func, Arg),
{ok, #proc{lang=Lang,
pid=Pid,
% Called via proc_prompt, proc_set_timeout, and proc_stop
prompt_fun={Mod, prompt},
set_timeout_fun={Mod, set_timeout},
stop_fun={Mod, stop}}};
add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) ->
Server#qserver{waitlist=[{Info, From}|Waitlist]}.

service_waitlist(#qserver{waitlist=[]}=Server) ->
Server;
service_waitlist(#qserver{waitlist=Waitlist}=Server) ->
[Oldest|RevWList] = lists:reverse(Waitlist),
case service_waiting(Oldest, Server) of
ok ->
Server#qserver{waitlist=lists:reverse(RevWList)};
wait ->
Server#qserver{waitlist=Waitlist}
end.

% todo get rid of duplication
service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) ->
Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
case lang_proc(Lang, Server, fun(Procs) ->
% find a proc in the set that has the DDoc
proc_with_ddoc(DDoc, DDocKey, Procs)
end) of
{ok, Proc} ->
gen_server:reply(From, {ok, Proc, Server#qserver.config}),
ok;
wait -> % this should never happen
wait;
Error ->
gen_server:reply(From, Error),
ok
end;
service_waiting({{Lang}, From}, Server) ->
case lang_proc(Lang, Server, fun([P|Procs]) ->
{ok, P}
end) of
{ok, Proc} ->
gen_server:reply(From, {ok, Proc, Server#qserver.config}),
ok;
wait -> % this should never happen
wait;
Error ->
gen_server:reply(From, Error),
ok
end.

lang_proc(Lang, #qserver{
langs=Langs,
pid_procs=PidProcs,
lang_procs=LangProcs,
lang_limits=LangLimits}, PickFun) ->
% Note to future self. Add max process limit.
case ets:lookup(LangProcs, Lang) of
[{Lang, [P|Procs]}] ->
{ok, Proc} = PickFun([P|Procs]),
rem_from_list(LangProcs, Lang, Proc),
{ok, Proc};
_ ->
{unknown_query_language, Lang}
case (catch new_process(Langs, LangLimits, Lang)) of
{ok, Proc} ->
add_value(PidProcs, Proc#proc.pid, Proc),
{ok, Proc2} = PickFun([Proc]);
ErrorOrWait ->
ErrorOrWait
end
end.

new_process(Langs, LangLimits, Lang) ->
[{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang),
if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit
% we are below the limit for our language, make a new one
case ets:lookup(Langs, Lang) of
[{Lang, Mod, Func, Arg}] ->
{ok, Pid} = apply(Mod, Func, Arg),
true = ets:insert(LangLimits, {Lang, Lim, Current+1}),
{ok, #proc{lang=Lang,
pid=Pid,
% Called via proc_prompt, proc_set_timeout, and proc_stop
prompt_fun={Mod, prompt},
set_timeout_fun={Mod, set_timeout},
stop_fun={Mod, stop}}};
_ ->
{unknown_query_language, Lang}
end;
true ->
wait
end.

proc_with_ddoc(DDoc, DDocKey, LangProcs) ->
Expand Down Expand Up @@ -402,12 +496,11 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
get_ddoc_process(#doc{} = DDoc, DDocKey) ->
% remove this case statement
case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of
{ok, Proc, QueryConfig} ->
{ok, Proc, {QueryConfig}} ->
% process knows the ddoc
case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
true ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000"))),
proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
link(Proc#proc.pid),
gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
Expand All @@ -421,11 +514,10 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) ->

get_os_process(Lang) ->
case gen_server:call(couch_query_servers, {get_proc, Lang}) of
{ok, Proc, QueryConfig} ->
case (catch proc_prompt(Proc, [<<"reset">>, QueryConfig])) of
{ok, Proc, {QueryConfig}} ->
case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
true ->
proc_set_timeout(Proc, list_to_integer(couch_config:get(
"couchdb", "os_process_timeout", "5000"))),
proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
link(Proc#proc.pid),
gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
Proc;
Expand Down

0 comments on commit e18c591

Please sign in to comment.