Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add support for a pool.

hackney is now supporting the use of a socket pool to reduce the use of
connections.

ex:

    Client = hackney:request(get, <<"https://friendpaste.com">>, [], <<>>,
                             [{pool, default}]).

Will use the default connection pool. You can reuse this pool later
using the same Client:

    hackney:send_request(Client, {get, <<"/">>, [], <<>>}).

Or do a new connection:

    Client = hackney:request(get, <<"https://friendpaste.com/_all_languages">>,
                             [], <<>>, [{pool, default}]).
  • Loading branch information...
commit 0bb952f08d3872098063398d63e9f09cc300f534 1 parent bf2fcac
@benoitc benoitc authored
View
1  .gitignore
@@ -3,3 +3,4 @@ ebin
*.sw*
deps
.DS_Store
+erl_crash.dump
View
14,804 erl_crash.dump
0 additions, 14,804 deletions not shown
View
50 examples/test1.erl
@@ -0,0 +1,50 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+%%! -pa ./ebin -pa ./deps/mimetypes/ebin
+
+-module(test1).
+
+
+
+main(_) ->
+ hackney:start(),
+
+ {ok, _, _, Client} = hackney:request(get, <<"https://friendpaste.com">>,
+ [], <<>>, [{pool, default}]),
+ {ok, Body, Client1} = hackney:body(Client),
+
+ io:format("body: ~p~n~n", [Body]),
+
+ {ok, _, _, Client2} = hackney:send_request(Client1, {get,
+ <<"/_all_languages">>,
+ [],
+ <<>>}),
+
+ {ok, Body1, Client3} = hackney:body(Client2),
+
+ io:format("body: ~p~n~n", [Body1]),
+
+
+ ReqBody = << "{
+ \"id\": \"some_paste_id\",
+ \"rev\": \"some_revision_id\",
+ \"changeset\": \"changeset in unidiff format\"
+ }" >>,
+
+ ReqHeaders = [{<<"Content-Type">>, <<"application/json">>}],
+
+ {ok, _, _, Client4} = hackney:send_request(Client3, {post, <<"/">>,
+ ReqHeaders,
+ ReqBody}),
+ {ok, Body2, Client5} = hackney:body(Client4),
+ io:format("body: ~p~n~n", [Body2]),
+
+ ReqBody1 = {file, "./examples/test.json"},
+
+ {ok, _, _, Client6} = hackney:send_request(Client5, {post, <<"/">>,
+ ReqHeaders,
+ ReqBody1}),
+ {ok, Body3, Client7} = hackney:body(Client6),
+ io:format("body: ~p~n~n", [Body3]),
+
+ hackney:close(Client7).
View
7 src/hackney.app.src
@@ -5,9 +5,8 @@
[
{description, "simple HTTP client"},
{vsn, "0.1.0"},
- {registered, []},
+ {registered, [hackney_pool]},
{applications, [kernel, stdlib, crypto, public_key, ssl]},
- {env, []},
- {mod, { hackney_app, []}},
- {env, []}
+ {mod, { hackney_app, nil}},
+ {env, [{timeout, 150000}, {pool_size, 50}]}
]}.
View
103 src/hackney.erl
@@ -14,7 +14,8 @@
request/1, request/2, request/3, request/4, request/5,
send_request/2,
stream_body/1,
- body/1, body/2, skip_body/1]).
+ body/1, body/2, skip_body/1,
+ pool/1]).
-include("hackney.hrl").
@@ -29,9 +30,8 @@ start() ->
stop() ->
application:stop(hackney).
-
connect(#client{state=connected}=Client) ->
- Client;
+ {ok, Client};
connect(#client{state=closed}=Client) ->
#client{transport=Transport, host=Host, port=Port} = Client,
@@ -43,29 +43,13 @@ connect(Transport, Host, Port) ->
connect(_Transport, _Host, _Port, #client{state=connected}=Client) ->
{ok, Client};
-connect(Transport, Host, Port, #client{options=Options,
- socket=Skt0}=Client)
- when is_list(Host), is_integer(Port), Skt0 =:= nil ->
-
- ConnectOpts0 = proplists:get_value(connect_options, Options, []),
-
- %% handle ipv6
- ConnectOpts = case hackney_util:is_ipv6(Host) of
- true ->
- [inet6 | ConnectOpts0];
- false ->
- ConnectOpts0
- end,
-
- case Transport:connect(Host, Port, ConnectOpts) of
- {ok, Skt} ->
- {ok, Client#client{transport=Transport,
- host=Host,
- port=Port,
- socket=Skt,
- state = connected}};
- Error ->
- Error
+connect(Transport, Host, Port, #client{socket=Skt}=Client)
+ when is_list(Host), is_integer(Port), Skt =:= nil ->
+ case pool(Client) of
+ undefined ->
+ do_connect(Transport, Host, Port, Client);
+ Pool ->
+ socket_from_pool(Pool, {Transport, Host, Port}, Client)
end;
connect(Transport, Host, Port, Options) when is_list(Options) ->
connect(Transport, Host, Port, #client{options=Options}).
@@ -152,14 +136,18 @@ send_request(#client{response_state=done}=Client0 ,
body_state=waiting},
send_request(Client, {Method, Path, Headers, Body});
-send_request(Client, {Method, Path, Headers, Body}) ->
-
- case {Client#client.response_state, Client#client.body_state} of
- {on_status, waiting} ->
- hackney_request:perform(connect(Client),
- {Method, Path, Headers, Body});
- _ ->
- {error, bad_response_state}
+send_request(Client0, {Method, Path, Headers, Body}) ->
+ case connect(Client0) of
+ {ok, Client} ->
+ case {Client#client.response_state, Client#client.body_state} of
+ {on_status, waiting} ->
+ hackney_request:perform(Client,
+ {Method, Path, Headers, Body});
+ _ ->
+ {error, bad_response_state}
+ end;
+ Error ->
+ Error
end.
@@ -184,3 +172,50 @@ body(MaxLength, Client) ->
-spec skip_body(#client{}) -> {ok, #client{}} | {error, atom()}.
skip_body(Client) ->
hackney_response:skip_body(Client).
+
+
+%% @doc get current pool pid or name used by a client if needed
+pool(#client{options=Opts}) ->
+ case proplists:get_value(pool, Opts) of
+ undefined ->
+ undefined;
+ default ->
+ whereis(hackney_pool);
+ Pool ->
+ Pool
+ end.
+
+
+socket_from_pool(Pool, {Transport, Host, Port}=Key, Client) ->
+ case hackney_pool:socket(Pool, Key) of
+ {ok, Skt} ->
+ {ok, Client#client{transport=Transport,
+ host=Host,
+ port=Port,
+ socket=Skt,
+ state = connected}};
+ no_socket ->
+ do_connect(Transport, Host, Port, Client)
+ end.
+
+do_connect(Transport, Host, Port, #client{options=Options}=Client) ->
+ ConnectOpts0 = proplists:get_value(connect_options, Options, []),
+
+ %% handle ipv6
+ ConnectOpts = case hackney_util:is_ipv6(Host) of
+ true ->
+ [inet6 | ConnectOpts0];
+ false ->
+ ConnectOpts0
+ end,
+
+ case Transport:connect(Host, Port, ConnectOpts) of
+ {ok, Skt} ->
+ {ok, Client#client{transport=Transport,
+ host=Host,
+ port=Port,
+ socket=Skt,
+ state = connected}};
+ Error ->
+ Error
+ end.
View
192 src/hackney_pool.erl
@@ -0,0 +1,192 @@
+%%% -*- erlang -*-
+%%%
+%%% This file is part of hackney released under the Apache 2 license.
+%%% See the NOTICE for more information.
+%%%
+%%% Copyright (c) 2012 Benoît Chesneau <benoitc@e-engura.org>
+
+%% @doc pool of sockets connections
+%%
+-module(hackney_pool).
+-behaviour(gen_server).
+
+
+-export([start_link/0, start_link/1]).
+-export([socket/2, release/3,
+ pool_size/1, pool_size/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ code_change/3, terminate/2]).
+
+-record(state, {
+ pool_size,
+ timeout,
+ connections = dict:new(),
+ sockets = dict:new()}).
+
+
+socket(PidOrName, {Transport, Host0, Port}) ->
+ Host = string:to_lower(Host0),
+ Pid = self(),
+ gen_server:call(PidOrName, {socket, {Transport, Host, Port}, Pid}).
+
+release(PidOrName, {Transport, Host0, Port}, Socket) ->
+ Host = string:to_lower(Host0),
+ gen_server:call(PidOrName, {release, {Transport, Host, Port}, Socket}).
+
+
+pool_size(PidOrName) ->
+ gen_server:call(PidOrName, pool_size).
+
+pool_size(PidOrName, {Transport, Host0, Port}) ->
+ Host = string:to_lower(Host0),
+ gen_server:call(PidOrName, {pool_size, {Transport, Host, Port}}).
+
+start_link() ->
+ start_link([]).
+
+start_link(Options0) ->
+ Options = maybe_apply_defaults([pool_size, timeout], Options0),
+ case proplists:get_value(name, Options) of
+ undefined ->
+ gen_server:start_link(?MODULE, Options, []);
+ Name ->
+ gen_server:start_link({local, Name}, ?MODULE, Options, [])
+ end.
+
+init(Options) ->
+ PoolSize = proplists:get_value(pool_size, Options),
+ Timeout = proplists:get_value(timeout, Options),
+ {ok, #state{pool_size=PoolSize, timeout=Timeout}}.
+
+
+handle_call({socket, Key, Pid}, _From, State) ->
+ {Reply, NewState} = find_connection(Key, Pid, State),
+ {reply, Reply, NewState};
+handle_call({release, Key, Socket}, _From, State) ->
+ NewState = store_connection(Key, Socket, State),
+ {reply, ok, NewState};
+handle_call(pool_size, _From, #state{sockets=Sockets}=State) ->
+ {ok, dict:size(Sockets), State};
+handle_call({pool_size, Key}, _From, #state{connections=Conns}=State) ->
+ Size = case dict:find(Key, Conns) of
+ {ok, Sockets} ->
+ length(Sockets);
+ error ->
+ 0
+ end,
+ {ok, Size, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({timeout, Socket}, State) ->
+ {noreply, remove_socket(Socket, State)};
+handle_info({tcp_closed, Socket}, State) ->
+ {noreply, remove_socket(Socket, State)};
+handle_info({ssl_closed, Socket}, State) ->
+ {noreply, remove_socket(Socket, State)};
+handle_info({tcp_error, Socket, _}, State) ->
+ {noreply, remove_socket(Socket, State)};
+handle_info({ssl_error, Socket, _}, State) ->
+ {noreply, remove_socket(Socket, State)};
+handle_info({tcp, Socket, _}, State) ->
+ {noreply, remove_socket(Socket, State)};
+handle_info({ssl, Socket, _}, State) ->
+ {noreply, remove_socket(Socket, State)};
+handle_info(_, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, #state{sockets=Sockets}) ->
+ lists:foreach(fun(Socket, {{Transport, _, _}, Timer}) ->
+ cancel_timer(Socket, Timer),
+ Transport:close(Socket)
+ end, dict:to_list(Sockets)),
+ ok.
+
+%% internals
+
+find_connection({Transport, _Host, _Port}=Key, Pid,
+ #state{connections=Conns, sockets=Sockets}=State) ->
+ case dict:find(Key, Conns) of
+ {ok, [S | Rest]} ->
+ Transport:setopts(S, [{active, false}]),
+ case Transport:controlling_process(S, Pid) of
+ ok ->
+ {_, Timer} = dict:fetch(S, Sockets),
+ cancel_timer(S, Timer),
+ NewConns = update_connections(Rest, Key, Conns),
+ NewSockets = dict:erase(S, Sockets),
+ NewState = State#state{connections=NewConns,
+ sockets=NewSockets},
+ {{ok, S}, NewState};
+ {error, badarg} ->
+ Transport:setopts(S, [{active, once}]),
+ {no_socket, State};
+ _ ->
+ find_connection(Key, Pid, remove_socket(S, State))
+ end;
+ error ->
+ {no_socket, State}
+ end.
+
+remove_socket(Socket, #state{connections=Conns, sockets=Sockets}=State) ->
+ case dict:find(Socket, Sockets) of
+ {Key, Timer} ->
+ cancel_timer(Socket, Timer),
+ ConnSockets = lists:delete(Socket, dict:fetch(Key, Conns)),
+ NewConns = update_connections(ConnSockets, Key, Conns),
+ NewSockets = dict:erase(Socket, Sockets),
+ State#state{connections=NewConns, sockets=NewSockets};
+ false ->
+ State
+ end.
+
+
+store_connection({Transport, _, _} = Key, Socket,
+ #state{timeout=Timeout, connections=Conns,
+ sockets=Sockets}=State) ->
+ Timer = erlang:send_after(Timeout, self(), {timeout, Socket}),
+ Transport:setopts(Socket, [{active, once}]),
+ ConnSockets = case dict:find(Key, Conns) of
+ {ok, OldSockets} ->
+ [Socket | OldSockets];
+ error -> [Socket]
+ end,
+
+ State#state{connections = dict:store(Key, ConnSockets, Conns),
+ sockets = dict:store(Socket, {Key, Timer}, Sockets)}.
+
+
+update_connections([], Key, Connections) ->
+ dict:erase(Key, Connections);
+update_connections(Sockets, Key, Connections) ->
+ dict:store(Key, Sockets, Connections).
+
+
+maybe_apply_defaults([], Options) ->
+ Options;
+maybe_apply_defaults([OptName | Rest], Options) ->
+ case proplists:is_defined(OptName, Options) of
+ true ->
+ maybe_apply_defaults(Rest, Options);
+ false ->
+ {ok, Default} = application:get_env(hackney, OptName),
+ maybe_apply_defaults(Rest, [{OptName, Default} | Options])
+ end.
+
+
+cancel_timer(Socket, Timer) ->
+ case erlang:cancel_timer(Timer) of
+ false ->
+ receive
+ {timeout, Socket} -> ok
+ after
+ 0 -> ok
+ end;
+ _ -> ok
+ end.
View
35 src/hackney_pool_sup.erl
@@ -1,35 +0,0 @@
-%%% -*- erlang -*-
-%%%
-%%% This file is part of hackney released under the Apache 2 license.
-%%% See the NOTICE for more information.
-%%%
-%%% Copyright (c) 2012 Benoît Chesneau <benoitc@e-engura.org>
-
-
--module(hackney_pool_sup).
--behaviour(supervisor).
-
-
-%% API
--export([start_link/0]).
-
-%% Supervisor callbacks
--export([init/1]).
-
-%% Helper macro for declaring children of supervisor
--define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-
-%% ===================================================================
-%% API functions
-%% ===================================================================
-
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-%% ===================================================================
-%% Supervisor callbacks
-%% ===================================================================
-
-init([]) ->
- {ok, { {one_for_one, 5, 10}, []} }.
-
View
8 src/hackney_response.erl
@@ -201,9 +201,15 @@ transfer_decode_done(_Length, Rest, Client0) ->
body_state=done,
buffer=Rest},
+ Pool = hackney:pool(Client),
case maybe_close(Client) of
true ->
close(Client);
+ false when Pool /= undefined ->
+ #client{host=Host, port=Port, transport=Transport,
+ socket=Socket}=Client,
+ hackney_pool:release(Pool, {Transport, Host, Port}, Socket),
+ Client#client{state=closed, socket=nil};
false ->
Client
end.
@@ -290,6 +296,8 @@ recv(#client{transport=Transport, socket=Skt}) ->
Transport:recv(Skt, 0).
+close(#client{socket=nil}=Client) ->
+ Client#client{state = closed};
close(#client{transport=Transport, socket=Skt}=Client) ->
Transport:close(Skt),
Client#client{state = closed, socket=nil}.
View
6 src/hackney_sup.erl
@@ -24,5 +24,9 @@ start_link() ->
%% ===================================================================
init([]) ->
- {ok, { {one_for_one, 5, 10}, []} }.
+ PoolOptions = [{name, hackney_pool}],
+ DefaultPool = {hackney_pool,
+ {hackney_pool, start_link, [PoolOptions]},
+ permanent, 10000, worker, [hackney_pool]},
+ {ok, { {one_for_one, 10, 1}, [DefaultPool]}}.
View
13 src/hackney_util.erl
@@ -12,6 +12,7 @@
to_lower/1, to_upper/1,
char_to_lower/1, char_to_upper/1,
join/2,
+ to_hex/1,
token_ci/2, token/2]).
@@ -130,6 +131,18 @@ join([S | Rest], Separator, []) ->
join([S | Rest], Separator, Acc) ->
join(Rest, Separator, [S, Separator, Acc]).
+to_hex([]) ->
+ [];
+to_hex(Bin) when is_binary(Bin) ->
+ to_hex(binary_to_list(Bin));
+to_hex([H|T]) ->
+ [to_digit(H div 16), to_digit(H rem 16) | to_hex(T)].
+
+to_digit(N) when N < 10 -> $0 + N;
+to_digit(N) -> $a + N-10.
+
+
+
%% @doc Parse a case-insensitive token.
%%
%% Changes all characters to lowercase.
Please sign in to comment.
Something went wrong with that request. Please try again.