diff --git a/.travis.yml b/.travis.yml index 0916b6d..1a283af 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,7 @@ otp_release: script: PATH=.:$PATH make update test dialyzer coveralls -install: wget https://github.com/erlang/rebar3/releases/download/3.3.1/rebar3 && chmod 755 rebar3 +install: wget https://github.com/erlang/rebar3/releases/download/3.3.6/rebar3 && chmod 755 rebar3 cache: directories: diff --git a/rebar.config b/rebar.config index 3114e03..ae2d9ee 100644 --- a/rebar.config +++ b/rebar.config @@ -6,7 +6,7 @@ {deps, [ {cowlib, "1.0.0"}, - {gproc, "0.5.0"}, + {worker_pool, "2.2.3"}, {metrics, "1.1.0"} ]}. @@ -32,7 +32,7 @@ {dialyzer, [ {warnings, [no_return, unmatched_returns, error_handling]}, - {plt_extra_apps, [cowlib, gproc]} + {plt_extra_apps, [cowlib, worker_pool]} ]}. {shell, [{apps, [katipo]}]}. diff --git a/rebar.lock b/rebar.lock index 78148ae..a3f429a 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,10 +1,10 @@ {"1.1.0", [{<<"cowlib">>,{pkg,<<"cowlib">>,<<"1.0.0">>},0}, - {<<"gproc">>,{pkg,<<"gproc">>,<<"0.5.0">>},0}, - {<<"metrics">>,{pkg,<<"metrics">>,<<"1.1.0">>},0}]}. + {<<"metrics">>,{pkg,<<"metrics">>,<<"1.1.0">>},0}, + {<<"worker_pool">>,{pkg,<<"worker_pool">>,<<"2.2.3">>},0}]}. [ {pkg_hash,[ {<<"cowlib">>, <<"397D890D669E56D486B0B5329973AD1A07012412BC110D34A737698DD6941741">>}, - {<<"gproc">>, <<"2DF2D886F8F8A7B81A4B04AA17972B5965BBC5BF0100EA6D8E8AC6A0E7389AFE">>}, - {<<"metrics">>, <<"41450DFFBA18B1206BF44FA1AD4E29324D1E561617C2DAF4EF32E3CE50DF08F3">>}]} + {<<"metrics">>, <<"41450DFFBA18B1206BF44FA1AD4E29324D1E561617C2DAF4EF32E3CE50DF08F3">>}, + {<<"worker_pool">>, <<"2CD7B2C289B900940297D283922D7E119C540CB8F29B5254639ABB9BFB100CAE">>}]} ]. diff --git a/src/katipo.app.src b/src/katipo.app.src index a46f6fd..1637fa4 100644 --- a/src/katipo.app.src +++ b/src/katipo.app.src @@ -7,7 +7,7 @@ [kernel, stdlib, cowlib, - gproc, + worker_pool, metrics ]}, {env, [{mod_metrics, dummy}]}, diff --git a/src/katipo.erl b/src/katipo.erl index c6b464e..b72023c 100644 --- a/src/katipo.erl +++ b/src/katipo.erl @@ -4,7 +4,7 @@ -compile({no_auto_import,[put/2]}). --export([start_link/3]). +-export([start_link/1]). -export([init/1]). -export([handle_call/3]). @@ -312,8 +312,8 @@ req(PoolName, Opts) Timeout = ?MODULE:get_timeout(Req), Req2 = Req#req{timeout=Timeout}, Ts = os:timestamp(), - Pid = gproc_pool:pick_worker(PoolName), - {Result, {Response, Metrics}} = gen_server:call(Pid, Req2, infinity), + {Result, {Response, Metrics}} = + wpool:call(PoolName, Req2, best_worker, infinity), TotalUs = timer:now_diff(os:timestamp(), Ts), Response2 = maybe_return_metrics(Req2, Metrics, Response), Ret = {Result, Response2}, @@ -324,17 +324,15 @@ req(PoolName, Opts) Error end. -start_link(PoolName, CurlOpts, WorkerId) when is_list(CurlOpts) andalso - is_atom(WorkerId) -> - Args = [PoolName, CurlOpts, WorkerId], +start_link(CurlOpts) when is_list(CurlOpts) -> + Args = [CurlOpts], gen_server:start_link(?MODULE, Args, []). -init([PoolName, CurlOpts, WorkerId]) -> +init([CurlOpts]) -> process_flag(trap_exit, true), Args = get_mopts(CurlOpts), Prog = filename:join([code:priv_dir(katipo), "katipo"]), Port = open_port({spawn, Prog ++ " " ++ Args}, [{packet, 4}, binary]), - true = gproc_pool:connect_worker(PoolName, WorkerId), {ok, #state{port=Port, reqs=#{}}}. handle_call(#req{method = Method, diff --git a/src/katipo_pool.erl b/src/katipo_pool.erl index 8704592..1f492a7 100644 --- a/src/katipo_pool.erl +++ b/src/katipo_pool.erl @@ -3,7 +3,6 @@ -export([start/2]). -export([start/3]). -export([stop/1]). --export([worker_name/2]). -type name() :: atom(). -export_type([name/0]). @@ -18,28 +17,13 @@ start(PoolName, PoolSize, WorkerOpts) when is_atom(PoolName) andalso is_integer(PoolSize) andalso is_list(WorkerOpts) -> - Args = [PoolName, PoolSize, WorkerOpts], + Args = [WorkerOpts], - ChildSpec = #{id => PoolName, - start => {katipo_pool_sup, start_link, Args}, - restart => permanent, - shutdown => 2000, - type => supervisor, - modules => [katipo_pool_sup]}, + PoolOpts = [{worker, {katipo, Args}}, + {workers, PoolSize}], - ok = gproc_pool:new(PoolName, round_robin, [{size, PoolSize}]), - _ = [gproc_pool:add_worker(PoolName, worker_name(PoolName, N)) - || N <- lists:seq(1, PoolSize)], - - supervisor:start_child(katipo_sup, ChildSpec). + wpool:start_sup_pool(PoolName, PoolOpts). -spec stop(name()) -> ok. stop(PoolName) when is_atom(PoolName) -> - ok = supervisor:terminate_child(katipo_sup, PoolName), - true = gproc_pool:force_delete(PoolName), - ok = supervisor:delete_child(katipo_sup, PoolName). - --spec worker_name(name(), pos_integer()) -> atom(). -worker_name(PoolName, N) when is_integer(N) -> - PoolNameList = atom_to_list(PoolName), - list_to_atom("katipo_" ++ PoolNameList ++ "_" ++ integer_to_list(N)). + wpool:stop_pool(PoolName). diff --git a/src/katipo_pool_sup.erl b/src/katipo_pool_sup.erl deleted file mode 100644 index e8b3829..0000000 --- a/src/katipo_pool_sup.erl +++ /dev/null @@ -1,28 +0,0 @@ --module(katipo_pool_sup). - --behaviour(supervisor). - --export([start_link/3]). - --export([init/1]). - -start_link(PoolName, PoolSize, WorkerOpts) -> - supervisor:start_link({local, PoolName}, ?MODULE, - [PoolName, PoolSize, WorkerOpts]). - -init([PoolName, PoolSize, WorkerOpts]) -> - SupFlags = #{strategy => one_for_one, - intensity => 20, - period => 5}, - - Children = [begin - WorkerName = katipo_pool:worker_name(PoolName, N), - #{id => WorkerName, - start => {katipo, start_link, [PoolName, WorkerOpts, WorkerName]}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => [katipo]} - end || N <- lists:seq(1, PoolSize)], - - {ok, {SupFlags, Children}}. diff --git a/test/katipo_SUITE.erl b/test/katipo_SUITE.erl index f644841..304a01c 100644 --- a/test/katipo_SUITE.erl +++ b/test/katipo_SUITE.erl @@ -5,6 +5,7 @@ -include_lib("common_test/include/ct.hrl"). -define(POOL, katipo_test_pool). +-define(POOL_SIZE, 2). suite() -> [{timetrap, {seconds, 30}}]. @@ -12,7 +13,7 @@ suite() -> init_per_suite(Config) -> application:ensure_all_started(katipo), application:ensure_all_started(meck), - {ok, _} = katipo_pool:start(?POOL, 2, []), + {ok, _} = katipo_pool:start(?POOL, ?POOL_SIZE), Config. end_per_suite(_Config) -> @@ -110,7 +111,6 @@ groups() -> proxy_couldnt_connect]}, {pool, [], [pool_start_stop, - pool_death, worker_death, port_death, port_late_response]}, @@ -413,47 +413,32 @@ pool_start_stop(_) -> PoolSize = 2, {ok, Pid} = katipo_pool:start(PoolName, PoolSize, []), ok = katipo_pool:stop(PoolName), + receive + after 2500 -> + ok + end, {ok, Pid2} = katipo_pool:start(PoolName, PoolSize, []), ok = katipo_pool:stop(PoolName), true = Pid =/= Pid2. -pool_death(_) -> - PoolName = die_pool, - PoolSize = 2, - {ok, Pid} = katipo_pool:start(PoolName, PoolSize, []), - - Active = gproc_pool:active_workers(PoolName), - exit(Pid, kill), - Fun = fun() -> - whereis(PoolName) =/= Pid andalso - whereis(PoolName) =/= undefined - end, - true = repeat_until_true(Fun), - Fun2 = fun() -> - Active2 = gproc_pool:active_workers(?POOL), - [] == Active2 -- (Active2 -- Active) - end, - true = repeat_until_true(Fun2), - Fun3 = fun() -> - length(Active) == length(gproc_pool:active_workers(PoolName)) - end, - true = repeat_until_true(Fun3), - Fun4 = fun() -> - {ok, #{status := 200}} = katipo:get(PoolName, <<"http://httpbin.org/get">>), - true - end, - true = repeat_until_true(Fun4). +active_workers() -> + Pids = [begin + Name = lists:flatten(io_lib:format("wpool_pool-~s-~B", [?POOL, N])), + NameAtom = list_to_existing_atom(Name), + whereis(NameAtom) + end || N <- lists:seq(1, ?POOL_SIZE)], + [P || P <- Pids, P /= undefined]. worker_death(_) -> - Active = gproc_pool:active_workers(?POOL), - _ = [exit(W, kill) || {_, W} <- Active], + Active = active_workers(), + _ = [exit(W, kill) || W <- Active], Fun = fun() -> - Active2 = gproc_pool:active_workers(?POOL), + Active2 = active_workers(), [] == Active2 -- (Active2 -- Active) end, true = repeat_until_true(Fun), Fun2 = fun() -> - length(Active) == length(gproc_pool:active_workers(?POOL)) + length(Active) == length(active_workers()) end, true = repeat_until_true(Fun2), Fun3 = fun() -> @@ -465,12 +450,16 @@ worker_death(_) -> port_death(_) -> PoolName = this_process_will_be_killed, PoolSize = 1, - {ok, _} = katipo_pool:start(PoolName, PoolSize, []), - {state, Port, _} = sys:get_state(gproc_pool:pick_worker(PoolName)), + {ok, _} = katipo_pool:start(PoolName, PoolSize), + WorkerName = wpool_pool:best_worker(PoolName), + WorkerPid = whereis(WorkerName), + {state, _, katipo, {state, Port, _}, _} = sys:get_state(WorkerPid), true = port_command(Port, <<"hdfjkshkjsdfgjsgafdjgsdjgfj">>), Fun = fun() -> - case sys:get_state(gproc_pool:pick_worker(PoolName)) of - {state, Port2, _} when Port =/= Port2 -> + WorkerName2 = wpool_pool:best_worker(PoolName), + WorkerPid2 = whereis(WorkerName2), + case sys:get_state(WorkerPid2) of + {state, _, katipo, {state, Port2, _}, _} when Port =/= Port2 -> {ok, #{status := 200}} = katipo:get(PoolName, <<"http://httpbin.org/get">>), true