Skip to content

Commit

Permalink
Merge pull request #52 from puzza007/worker-pool
Browse files Browse the repository at this point in the history
Worker pool
  • Loading branch information
puzza007 committed May 1, 2017
2 parents f2568df + 0f46ded commit ab8bdbd
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ otp_release:

script: PATH=.:$PATH make update test dialyzer coveralls

install: wget https://github.com/erlang/rebar3/releases/download/3.3.1/rebar3 && chmod 755 rebar3
install: wget https://github.com/erlang/rebar3/releases/download/3.3.6/rebar3 && chmod 755 rebar3

cache:
directories:
Expand Down
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

{deps, [
{cowlib, "1.0.0"},
{gproc, "0.5.0"},
{worker_pool, "2.2.3"},
{metrics, "1.1.0"}
]}.

Expand All @@ -32,7 +32,7 @@

{dialyzer, [
{warnings, [no_return, unmatched_returns, error_handling]},
{plt_extra_apps, [cowlib, gproc]}
{plt_extra_apps, [cowlib, worker_pool]}
]}.

{shell, [{apps, [katipo]}]}.
Expand Down
8 changes: 4 additions & 4 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{"1.1.0",
[{<<"cowlib">>,{pkg,<<"cowlib">>,<<"1.0.0">>},0},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.5.0">>},0},
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.1.0">>},0}]}.
{<<"metrics">>,{pkg,<<"metrics">>,<<"1.1.0">>},0},
{<<"worker_pool">>,{pkg,<<"worker_pool">>,<<"2.2.3">>},0}]}.
[
{pkg_hash,[
{<<"cowlib">>, <<"397D890D669E56D486B0B5329973AD1A07012412BC110D34A737698DD6941741">>},
{<<"gproc">>, <<"2DF2D886F8F8A7B81A4B04AA17972B5965BBC5BF0100EA6D8E8AC6A0E7389AFE">>},
{<<"metrics">>, <<"41450DFFBA18B1206BF44FA1AD4E29324D1E561617C2DAF4EF32E3CE50DF08F3">>}]}
{<<"metrics">>, <<"41450DFFBA18B1206BF44FA1AD4E29324D1E561617C2DAF4EF32E3CE50DF08F3">>},
{<<"worker_pool">>, <<"2CD7B2C289B900940297D283922D7E119C540CB8F29B5254639ABB9BFB100CAE">>}]}
].
5 changes: 2 additions & 3 deletions src/katipo.app.src
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{application, 'katipo',
[{description, "HTTP client based on libcurl"},
{vsn, "0.4.0"},
{vsn, "0.5.0"},
{registered, []},
{mod, {'katipo_app', []}},
{applications,
[kernel,
stdlib,
cowlib,
gproc,
worker_pool,
metrics
]},
{env, [{mod_metrics, dummy}]},
Expand All @@ -24,7 +24,6 @@
"src/katipo_app.erl",
"src/katipo_metrics.erl",
"src/katipo_pool.erl",
"src/katipo_pool_sup.erl",
"src/katipo_session.erl",
"src/katipo_sup.erl"]}
]}.
14 changes: 6 additions & 8 deletions src/katipo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

-compile({no_auto_import,[put/2]}).

-export([start_link/3]).
-export([start_link/1]).

-export([init/1]).
-export([handle_call/3]).
Expand Down Expand Up @@ -316,8 +316,8 @@ req(PoolName, Opts)
Timeout = ?MODULE:get_timeout(Req),
Req2 = Req#req{timeout=Timeout},
Ts = os:timestamp(),
Pid = gproc_pool:pick_worker(PoolName),
{Result, {Response, Metrics}} = gen_server:call(Pid, Req2, infinity),
{Result, {Response, Metrics}} =
wpool:call(PoolName, Req2, best_worker, infinity),
TotalUs = timer:now_diff(os:timestamp(), Ts),
Response2 = maybe_return_metrics(Req2, Metrics, Response),
Ret = {Result, Response2},
Expand All @@ -328,17 +328,15 @@ req(PoolName, Opts)
Error
end.

start_link(PoolName, CurlOpts, WorkerId) when is_list(CurlOpts) andalso
is_atom(WorkerId) ->
Args = [PoolName, CurlOpts, WorkerId],
start_link(CurlOpts) when is_list(CurlOpts) ->
Args = [CurlOpts],
gen_server:start_link(?MODULE, Args, []).

init([PoolName, CurlOpts, WorkerId]) ->
init([CurlOpts]) ->
process_flag(trap_exit, true),
Args = get_mopts(CurlOpts),
Prog = filename:join([code:priv_dir(katipo), "katipo"]),
Port = open_port({spawn, Prog ++ " " ++ Args}, [{packet, 4}, binary]),
true = gproc_pool:connect_worker(PoolName, WorkerId),
{ok, #state{port=Port, reqs=#{}}}.

handle_call(#req{method = Method,
Expand Down
26 changes: 5 additions & 21 deletions src/katipo_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
-export([start/2]).
-export([start/3]).
-export([stop/1]).
-export([worker_name/2]).

-type name() :: atom().
-export_type([name/0]).
Expand All @@ -18,28 +17,13 @@ start(PoolName, PoolSize, WorkerOpts)
when is_atom(PoolName) andalso
is_integer(PoolSize) andalso
is_list(WorkerOpts) ->
Args = [PoolName, PoolSize, WorkerOpts],
Args = [WorkerOpts],

ChildSpec = #{id => PoolName,
start => {katipo_pool_sup, start_link, Args},
restart => permanent,
shutdown => 2000,
type => supervisor,
modules => [katipo_pool_sup]},
PoolOpts = [{worker, {katipo, Args}},
{workers, PoolSize}],

ok = gproc_pool:new(PoolName, round_robin, [{size, PoolSize}]),
_ = [gproc_pool:add_worker(PoolName, worker_name(PoolName, N))
|| N <- lists:seq(1, PoolSize)],

supervisor:start_child(katipo_sup, ChildSpec).
wpool:start_sup_pool(PoolName, PoolOpts).

-spec stop(name()) -> ok.
stop(PoolName) when is_atom(PoolName) ->
ok = supervisor:terminate_child(katipo_sup, PoolName),
true = gproc_pool:force_delete(PoolName),
ok = supervisor:delete_child(katipo_sup, PoolName).

-spec worker_name(name(), pos_integer()) -> atom().
worker_name(PoolName, N) when is_integer(N) ->
PoolNameList = atom_to_list(PoolName),
list_to_atom("katipo_" ++ PoolNameList ++ "_" ++ integer_to_list(N)).
wpool:stop_pool(PoolName).
28 changes: 0 additions & 28 deletions src/katipo_pool_sup.erl

This file was deleted.

61 changes: 25 additions & 36 deletions test/katipo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
-include_lib("common_test/include/ct.hrl").

-define(POOL, katipo_test_pool).
-define(POOL_SIZE, 2).

suite() ->
[{timetrap, {seconds, 30}}].

init_per_suite(Config) ->
application:ensure_all_started(katipo),
application:ensure_all_started(meck),
{ok, _} = katipo_pool:start(?POOL, 2, []),
{ok, _} = katipo_pool:start(?POOL, ?POOL_SIZE),
Config.

end_per_suite(_Config) ->
Expand Down Expand Up @@ -110,7 +111,6 @@ groups() ->
proxy_couldnt_connect]},
{pool, [],
[pool_start_stop,
pool_death,
worker_death,
port_death,
port_late_response]},
Expand Down Expand Up @@ -413,47 +413,32 @@ pool_start_stop(_) ->
PoolSize = 2,
{ok, Pid} = katipo_pool:start(PoolName, PoolSize, []),
ok = katipo_pool:stop(PoolName),
receive
after 2500 ->
ok
end,
{ok, Pid2} = katipo_pool:start(PoolName, PoolSize, []),
ok = katipo_pool:stop(PoolName),
true = Pid =/= Pid2.

pool_death(_) ->
PoolName = die_pool,
PoolSize = 2,
{ok, Pid} = katipo_pool:start(PoolName, PoolSize, []),

Active = gproc_pool:active_workers(PoolName),
exit(Pid, kill),
Fun = fun() ->
whereis(PoolName) =/= Pid andalso
whereis(PoolName) =/= undefined
end,
true = repeat_until_true(Fun),
Fun2 = fun() ->
Active2 = gproc_pool:active_workers(?POOL),
[] == Active2 -- (Active2 -- Active)
end,
true = repeat_until_true(Fun2),
Fun3 = fun() ->
length(Active) == length(gproc_pool:active_workers(PoolName))
end,
true = repeat_until_true(Fun3),
Fun4 = fun() ->
{ok, #{status := 200}} = katipo:get(PoolName, <<"http://httpbin.org/get">>),
true
end,
true = repeat_until_true(Fun4).
active_workers() ->
Pids = [begin
Name = lists:flatten(io_lib:format("wpool_pool-~s-~B", [?POOL, N])),
NameAtom = list_to_existing_atom(Name),
whereis(NameAtom)
end || N <- lists:seq(1, ?POOL_SIZE)],
[P || P <- Pids, P /= undefined].

worker_death(_) ->
Active = gproc_pool:active_workers(?POOL),
_ = [exit(W, kill) || {_, W} <- Active],
Active = active_workers(),
_ = [exit(W, kill) || W <- Active],
Fun = fun() ->
Active2 = gproc_pool:active_workers(?POOL),
Active2 = active_workers(),
[] == Active2 -- (Active2 -- Active)
end,
true = repeat_until_true(Fun),
Fun2 = fun() ->
length(Active) == length(gproc_pool:active_workers(?POOL))
length(Active) == length(active_workers())
end,
true = repeat_until_true(Fun2),
Fun3 = fun() ->
Expand All @@ -465,12 +450,16 @@ worker_death(_) ->
port_death(_) ->
PoolName = this_process_will_be_killed,
PoolSize = 1,
{ok, _} = katipo_pool:start(PoolName, PoolSize, []),
{state, Port, _} = sys:get_state(gproc_pool:pick_worker(PoolName)),
{ok, _} = katipo_pool:start(PoolName, PoolSize),
WorkerName = wpool_pool:best_worker(PoolName),
WorkerPid = whereis(WorkerName),
{state, _, katipo, {state, Port, _}, _} = sys:get_state(WorkerPid),
true = port_command(Port, <<"hdfjkshkjsdfgjsgafdjgsdjgfj">>),
Fun = fun() ->
case sys:get_state(gproc_pool:pick_worker(PoolName)) of
{state, Port2, _} when Port =/= Port2 ->
WorkerName2 = wpool_pool:best_worker(PoolName),
WorkerPid2 = whereis(WorkerName2),
case sys:get_state(WorkerPid2) of
{state, _, katipo, {state, Port2, _}, _} when Port =/= Port2 ->
{ok, #{status := 200}} =
katipo:get(PoolName, <<"http://httpbin.org/get">>),
true
Expand Down

0 comments on commit ab8bdbd

Please sign in to comment.