Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' of git://github.com/ferd/lhttpc

  • Loading branch information...
commit 748781b634dcdcb7c4eaade50a4ea916947010ac 2 parents 11fad66 + 73769cb
@lpgauth authored
View
3  README
@@ -10,3 +10,6 @@ Configuration: (environment variables)
kepp a HTTP/1.1 connection open. Changing this value
in runtime has no effect, this can however be done
through lhttpc_manager:update_connection_timeout/1.
+
+NOTE: THIS FORK OF LHTTPC IS ONLY RECOMMENDED IF YOU HAVE MANY REQUESTS TO DO TO A FEW RESTRICTED DOMAINS.
+It contains load-balancing mechanisms described in http://ferd.ca/rtb-where-erlang-blooms.html. It is not meant for general purpose use.
View
1  rebar.config
@@ -1 +1,2 @@
+{erl_opts, [debug_info]}.
{cover_enabled, true}.
View
10 src/lhttpc.app.src
@@ -29,11 +29,11 @@
%%% @end
{application, lhttpc,
[{description, "Lightweight HTTP Client"},
- {vsn, "1.2.6"},
- {modules, []},
- {registered, [lhttpc_manager]},
+ {vsn, "1.2.8"},
+ {modules, [lhttpc,lhttpc_sup,lhttpc_client,lhttpc_sock,lhttp_lb]},
+ {registered, [lhttpc_sup]},
{applications, [kernel, stdlib, ssl, crypto]},
- {mod, {lhttpc, nil}},
- {env, []}
+ {mod, {lhttpc, []}},
+ {env, [{connection_timeout, 300000}]}
]}.
View
50 src/lhttpc.erl
@@ -52,7 +52,7 @@
%% @hidden
-spec start(normal | {takeover, node()} | {failover, node()}, any()) ->
{ok, pid()}.
-start(_, _) ->
+start(_, Opts) ->
case lists:member({seed,1}, ssl:module_info(exports)) of
true ->
% Make sure that the ssl random number generator is seeded
@@ -61,7 +61,9 @@ start(_, _) ->
false ->
ok
end,
- lhttpc_sup:start_link().
+ if is_list(Opts) -> lhttpc_sup:start_link(Opts);
+ true -> lhttpc_sup:start_link()
+ end.
%% @hidden
-spec stop(any()) -> ok.
@@ -320,12 +322,16 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
headers(), iolist(), pos_integer(), [option()]) -> result().
request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
verify_options(Options, []),
- ReqId = erlang:now(),
+ ReqId = now(),
case proplists:is_defined(stream_to, Options) of
true ->
StreamTo = proplists:get_value(stream_to, Options),
Args = [ReqId, StreamTo, Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
- Pid = spawn(lhttpc_client, request_with_timeout, [Timeout, Args]),
+ Pid = spawn(lhttpc_client, request, Args),
+ spawn(fun() ->
+ R = kill_client_after(Pid, Timeout),
+ StreamTo ! {response, ReqId, Pid, R}
+ end),
{ReqId, Pid};
false ->
Args = [ReqId, self(), Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
@@ -338,7 +344,7 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
% linked client send us an exit signal, since this can be
% caught by the caller.
exit(Reason);
- {'EXIT', ReqId, Pid, Reason} ->
+ {'EXIT', Pid, Reason} ->
% This could happen if the process we're running in taps exits
% and the client process exits due to some exit signal being
% sent to it. Very unlikely though
@@ -397,7 +403,7 @@ send_body_part({Pid, 0}, IoList, Timeout) when is_pid(Pid) ->
send_body_part({Pid, 1}, IoList, Timeout);
{response, _ReqId, Pid, R} ->
R;
- {exit, Pid, Reason} ->
+ {exit, _ReqId, Pid, Reason} ->
exit(Reason);
{'EXIT', Pid, Reason} ->
exit(Reason)
@@ -410,9 +416,9 @@ send_body_part({Pid, Window}, IoList, _Timeout) when Window > 0, is_pid(Pid) ->
receive
{ack, Pid} ->
{ok, {Pid, Window}};
- {response, _ReqId, Pid, R} ->
+ {reponse, _ReqId, Pid, R} ->
R;
- {exit, Pid, Reason} ->
+ {exit, _ReqId, Pid, Reason} ->
exit(Reason);
{'EXIT', Pid, Reason} ->
exit(Reason)
@@ -515,7 +521,7 @@ read_response(Pid, Timeout) ->
read_response(Pid, Timeout);
{response, _ReqId, Pid, R} ->
R;
- {exit, Pid, Reason} ->
+ {exit, _ReqId, Pid, Reason} ->
exit(Reason);
{'EXIT', Pid, Reason} ->
exit(Reason)
@@ -537,6 +543,23 @@ kill_client(Pid) ->
erlang:error(Reason)
end.
+kill_client_after(Pid, Timeout) ->
+ erlang:monitor(process, Pid),
+ receive
+ {'DOWN', _, process, Pid, _Reason} -> exit(normal)
+ after Timeout ->
+ catch unlink(Pid), % or we'll kill ourself :O
+ exit(Pid, timeout),
+ receive
+ {'DOWN', _, process, Pid, timeout} ->
+ {error, timeout};
+ {'DOWN', _, process, Pid, Reason} ->
+ erlang:error(Reason)
+ after 1000 ->
+ exit(normal) % silent failure!
+ end
+ end.
+
-spec verify_options(options(), options()) -> ok.
verify_options([{send_retry, N} | Options], Errors)
when is_integer(N), N >= 0 ->
@@ -551,11 +574,8 @@ verify_options([{connection_timeout, infinity} | Options], Errors) ->
verify_options([{connection_timeout, MS} | Options], Errors)
when is_integer(MS), MS >= 0 ->
verify_options(Options, Errors);
-verify_options([{max_connections, MS} | Options], Errors)
- when is_integer(MS), MS >= 0 ->
- verify_options(Options, Errors);
-verify_options([{stream_to, Pid} | Options], Errors)
- when is_pid(Pid) ->
+verify_options([{max_connections, N} | Options], Errors)
+ when is_integer(N), N > 0 ->
verify_options(Options, Errors);
verify_options([{partial_upload, WindowSize} | Options], Errors)
when is_integer(WindowSize), WindowSize >= 0 ->
@@ -574,6 +594,8 @@ verify_options([{partial_download, DownloadOptions} | Options], Errors)
verify_options([{connect_options, List} | Options], Errors)
when is_list(List) ->
verify_options(Options, Errors);
+verify_options([{stream_to, Pid} | Options], Errors) when is_pid(Pid) ->
+ verify_options(Options, Errors);
verify_options([Option | Options], Errors) ->
verify_options(Options, [Option | Errors]);
verify_options([], []) ->
View
83 src/lhttpc_client.erl
@@ -29,22 +29,20 @@
%%% @doc
%%% This module implements the HTTP request handling. This should normally
%%% not be called directly since it should be spawned by the lhttpc module.
-%%% @end
-module(lhttpc_client).
--export([request/10, request_with_timeout/2]).
+-export([request/10]).
-include("lhttpc_types.hrl").
-record(client_state, {
- req_id :: tuple(),
+ req_id :: term(),
host :: string(),
port = 80 :: integer(),
ssl = false :: true | false,
method :: string(),
request :: iolist(),
request_headers :: headers(),
- load_balancer:: pid(),
socket,
connect_timeout = infinity :: timeout(),
connect_options = [] :: [any()],
@@ -63,15 +61,10 @@
-define(CONNECTION_HDR(HDRS, DEFAULT),
string:to_lower(lhttpc_lib:header_value("connection", HDRS, DEFAULT))).
-request_with_timeout(Timeout, [ReqId, StreamTo, _Host, _Port, _Ssl, _Path, _Method, _Hdrs, _Body, _Options] = Args) ->
- TimerRef = erlang:send_after(Timeout, lhttpc_manager, {kill_client_after_timeout, ReqId, self(), StreamTo}),
- ok = apply(?MODULE, request, Args),
- erlang:cancel_timer(TimerRef).
-
--spec request(tuple(), pid(), string(), 1..65535, true | false, string(),
+-spec request(term(), pid(), string(), 1..65535, true | false, string(),
string() | atom(), headers(), iolist(), [option()]) -> no_return().
%% @spec (ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, RequestBody, Options) -> ok
-%% ReqId = tuple()
+%% ReqId = term()
%% From = pid()
%% Host = string()
%% Port = integer()
@@ -108,14 +101,16 @@ execute(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
PartialUpload = proplists:is_defined(partial_upload, Options),
PartialDownload = proplists:is_defined(partial_download, Options),
PartialDownloadOptions = proplists:get_value(partial_download, Options, []),
- ConnectOptions = proplists:get_value(connect_options, Options, []),
NormalizedMethod = lhttpc_lib:normalize_method(Method),
MaxConnections = proplists:get_value(max_connections, Options, 10),
ConnectionTimeout = proplists:get_value(connection_timeout, Options, infinity),
{ChunkedUpload, Request} = lhttpc_lib:format_request(Path, NormalizedMethod,
Hdrs, Host, Port, Body, PartialUpload),
- LbRequest = {lb, Host, Port, Ssl, MaxConnections, ConnectionTimeout},
- {ok, Lb} = gen_server:call(lhttpc_manager, LbRequest, infinity),
+ Socket = case lhttpc_lb:checkout(Host, Port, Ssl, MaxConnections, ConnectionTimeout) of
+ {ok, S} -> S; % Re-using HTTP/1.1 connections
+ retry_later -> throw(retry_later);
+ no_socket -> undefined % Opening a new HTTP/1.1 connection
+ end,
State = #client_state{
req_id = ReqId,
host = Host,
@@ -125,10 +120,10 @@ execute(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
request = Request,
requester = From,
request_headers = Hdrs,
- load_balancer = Lb,
+ socket = Socket,
connect_timeout = proplists:get_value(connect_timeout, Options,
infinity),
- connect_options = ConnectOptions,
+ connect_options = proplists:get_value(connect_options, Options, []),
attempts = 1 + proplists:get_value(send_retry, Options, 1),
partial_upload = PartialUpload,
upload_window = UploadWindowSize,
@@ -143,56 +138,59 @@ execute(ReqId, From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
{R, undefined} ->
{ok, R};
{R, NewSocket} ->
- case lhttpc_sock:controlling_process(NewSocket, Lb, Ssl) of
- ok ->
- gen_server:cast(Lb, {store, NewSocket});
- _ ->
- ok
- end,
+ % The socket we ended up doing the request over is returned
+ % here, it might be the same as Socket, but we don't know.
+ lhttpc_lb:checkin(Host, Port, Ssl, NewSocket),
{ok, R}
end,
{response, ReqId, self(), Response}.
send_request(#client_state{attempts = 0}) ->
+ % Don't try again if the number of allowed attempts is 0.
throw(connection_closed);
send_request(#client_state{socket = undefined} = State) ->
+ Host = State#client_state.host,
+ Port = State#client_state.port,
+ Ssl = State#client_state.ssl,
+ Timeout = State#client_state.connect_timeout,
ConnectOptions = State#client_state.connect_options,
- ConnectTimeout = State#client_state.connect_timeout,
- Lb = State#client_state.load_balancer,
- SocketRequest = {socket, self(), ConnectOptions, ConnectTimeout},
- case gen_server:call(Lb, SocketRequest, infinity) of
+ SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
+ case lhttpc_sock:connect(Host, Port, SocketOptions, Timeout, Ssl) of
{ok, Socket} ->
- lhttpc_sock:setopts(Socket, [{active, false}], State#client_state.ssl),
send_request(State#client_state{socket = Socket});
+ {error, etimedout} ->
+ % TCP stack decided to give up
+ throw(connect_timeout);
+ {error, timeout} ->
+ throw(connect_timeout);
{error, Reason} ->
- throw(Reason)
+ erlang:error(Reason)
end;
send_request(State) ->
- Lb = State#client_state.load_balancer,
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
Request = State#client_state.request,
case lhttpc_sock:send(Socket, Request, Ssl) of
ok ->
if
- State#client_state.partial_upload -> partial_upload(State);
+ State#client_state.partial_upload -> partial_upload(State);
not State#client_state.partial_upload -> read_response(State)
end;
{error, closed} ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
NewState = State#client_state{
socket = undefined,
attempts = State#client_state.attempts - 1
},
send_request(NewState);
{error, Reason} ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
erlang:error(Reason)
end.
partial_upload(State) ->
Response = {ok, {self(), State#client_state.upload_window}},
- State#client_state.requester ! {response, State#client_state.req_id, self(), Response},
+ State#client_state.requester ! {response,State#client_state.req_id, self(), Response},
partial_upload_loop(State#client_state{attempts = 1, request = undefined}).
partial_upload_loop(State = #client_state{requester = Pid}) ->
@@ -233,8 +231,8 @@ encode_body_part(#client_state{chunked_upload = false}, Data) ->
check_send_result(_State, ok) ->
ok;
-check_send_result(#client_state{socket = Socket, load_balancer = Lb}, {error, Reason}) ->
- gen_server:cast(Lb, {remove, Socket}),
+check_send_result(#client_state{socket = Sock, ssl = Ssl}, {error, Reason}) ->
+ lhttpc_sock:close(Sock, Ssl),
throw(Reason).
read_response(#client_state{socket = Socket, ssl = Ssl} = State) ->
@@ -242,7 +240,6 @@ read_response(#client_state{socket = Socket, ssl = Ssl} = State) ->
read_response(State, nil, {nil, nil}, []).
read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
- Lb = State#client_state.load_balancer,
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
case lhttpc_sock:recv(Socket, Ssl) of
@@ -265,7 +262,7 @@ read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
Response = handle_response_body(State, Vsn, Status, Hdrs),
NewHdrs = element(2, Response),
ReqHdrs = State#client_state.request_headers,
- NewSocket = maybe_close_socket(Lb, Socket, Vsn, ReqHdrs, NewHdrs),
+ NewSocket = maybe_close_socket(Socket, Ssl, Vsn, ReqHdrs, NewHdrs),
{Response, NewSocket};
{error, closed} ->
% Either we only noticed that the socket was closed after we
@@ -273,14 +270,14 @@ read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
% the request on the wire or the server has some issues and is
% closing connections without sending responses.
% If this the first attempt to send the request, we will try again.
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
NewState = State#client_state{
socket = undefined,
attempts = State#client_state.attempts - 1
},
send_request(NewState);
{error, timeout} ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
NewState = State#client_state{
socket = undefined,
attempts = 0
@@ -622,22 +619,22 @@ read_until_closed(Socket, Acc, Hdrs, Ssl) ->
erlang:error(Reason)
end.
-maybe_close_socket(Lb, Socket, {1, Minor}, ReqHdrs, RespHdrs) when Minor >= 1->
+maybe_close_socket(Socket, Ssl, {1, Minor}, ReqHdrs, RespHdrs) when Minor >= 1->
ClientConnection = ?CONNECTION_HDR(ReqHdrs, "keep-alive"),
ServerConnection = ?CONNECTION_HDR(RespHdrs, "keep-alive"),
if
ClientConnection =:= "close"; ServerConnection =:= "close" ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
undefined;
ClientConnection =/= "close", ServerConnection =/= "close" ->
Socket
end;
-maybe_close_socket(Lb, Socket, _, ReqHdrs, RespHdrs) ->
+maybe_close_socket(Socket, Ssl, _, ReqHdrs, RespHdrs) ->
ClientConnection = ?CONNECTION_HDR(ReqHdrs, "keep-alive"),
ServerConnection = ?CONNECTION_HDR(RespHdrs, "close"),
if
ClientConnection =:= "close"; ServerConnection =/= "keep-alive" ->
- gen_server:cast(Lb, {remove, Socket}),
+ lhttpc_sock:close(Socket, Ssl),
undefined;
ClientConnection =/= "close", ServerConnection =:= "keep-alive" ->
Socket
View
382 src/lhttpc_lb.erl
@@ -1,182 +1,250 @@
+%%% Load balancer for lhttpc, replacing the older lhttpc_manager.
+%%% Takes a similar stance of storing used-but-not-closed sockets.
+%%% Also adds functionality to limit the number of simultaneous
+%%% connection attempts from clients.
-module(lhttpc_lb).
+-behaviour(gen_server).
+-export([start_link/5, checkout/5, checkin/3, checkin/4]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ code_change/3, terminate/2]).
--export([
- start_link/1
- ]).
--export([
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- code_change/3,
- terminate/2
- ]).
+-define(SHUTDOWN_DELAY, 10000).
--behaviour(gen_server).
+%% TODO: transfert_socket, in case of checkout+give_away
--record(httpc_man, {
- host :: string(),
- port = 80 :: integer(),
- ssl = false :: true | false,
- max_connections = 10 :: non_neg_integer(),
- connection_timeout = 300000 :: non_neg_integer(),
- sockets,
- available_sockets = []
- }).
-
-%% @spec (any()) -> {ok, pid()}
-%% @doc Starts and link to the gen server.
-%% This is normally called by a supervisor.
-%% @end
--spec start_link(any()) -> {ok, pid()}.
-start_link([Dest, Opts]) ->
- gen_server:start_link(?MODULE, [Dest, Opts], []).
-
-%% @hidden
--spec init(any()) -> {ok, #httpc_man{}}.
-init([{Host, Port, Ssl}, {MaxConnections, ConnectionTimeout}]) ->
- State = #httpc_man{
- host = Host,
- port = Port,
- ssl = Ssl,
- max_connections = MaxConnections,
- connection_timeout = ConnectionTimeout,
- sockets = ets:new(sockets, [set])
- },
- {ok, State}.
+-record(state, {host :: host(),
+ port :: port(),
+ ssl :: boolean(),
+ max_conn :: max_connections(),
+ timeout :: timeout(),
+ clients :: ets:tid(),
+ free=[] :: list()}).
+
+-export_types([host/0, port/0, max_connections/0, connection_timeout/0]).
+-type host() :: inet:ip_address()|string().
+-type port_number() :: 1..65535.
+-type max_connections() :: pos_integer().
+-type connection_timeout() :: timeout().
+
+
+-spec start_link(host(), port_number(), SSL::boolean(),
+ max_connections(), connection_timeout()) -> {ok, pid()}.
+start_link(Host, Port, Ssl, MaxConn, ConnTimeout) ->
+ gen_server:start_link(?MODULE, {Host, Port, Ssl, MaxConn, ConnTimeout}, []).
+
+-spec checkout(host(), port_number(), SSL::boolean(),
+ max_connections(), connection_timeout()) ->
+ {ok, port()} | retry_later | no_socket.
+checkout(Host, Port, Ssl, MaxConn, ConnTimeout) ->
+ Lb = find_lb({Host,Port,Ssl}, {MaxConn, ConnTimeout}),
+ gen_server:call(Lb, {checkout, self()}, infinity).
+
+%% Called when the socket has died and we're done
+-spec checkin(host(), port_number(), SSL::boolean()) -> ok.
+checkin(Host, Port, Ssl) ->
+ case find_lb({Host,Port,Ssl}) of
+ {error, undefined} -> ok; % LB is dead. Pretend it's fine -- we don't care
+ {ok, Pid} -> gen_server:cast(Pid, {checkin, self()})
+ end.
-%% @hidden
--spec handle_call(any(), any(), #httpc_man{}) ->
- {reply, any(), #httpc_man{}}.
-handle_call({socket, Pid, ConnectOptions, ConnectTimeout}, _, State) ->
- {Reply, NewState} = find_socket(Pid, ConnectOptions, ConnectTimeout, State),
- {reply, Reply, NewState};
-handle_call(_, _, State) ->
- {reply, {error, unknown_request}, State}.
-
-%% @hidden
--spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
-handle_cast({store, Socket}, State) ->
- NewState = store_socket(Socket, State),
- {noreply, NewState};
-handle_cast({remove, Socket}, State) ->
- NewState = remove_socket(Socket, State),
- {noreply, NewState};
-handle_cast({terminate}, State) ->
- terminate(undefined, State),
- {noreply, State};
-handle_cast(_, State) ->
+%% Called when we're done and the socket can still be reused
+-spec checkin(host(), port_number(), SSL::boolean(), Socket::port()) -> ok.
+checkin(Host, Port, Ssl, Socket) ->
+ case find_lb({Host,Port,Ssl}) of
+ {error, undefined} ->
+ %% should we close the socket? We're not keeping it! There are no
+ %% Lbs available!
+ ok;
+ {ok, Pid} ->
+ %% Give ownership back to the server ASAP. The client has to have
+ %% kept the socket passive. We rely on its good behaviour.
+ %% If the transfer doesn't work, we don't notify.
+ case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
+ ok -> gen_server:cast(Pid, {checkin, self(), Socket});
+ _ -> ok
+ end
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% GEN_SERVER CALLBACKS %%%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+init({Host,Port,Ssl,MaxConn,ConnTimeout}) ->
+ %% we must use insert_new because it is possible a concurrent request is
+ %% starting such a server at exactly the same time.
+ case ets:insert_new(?MODULE, {{Host,Port,Ssl}, self()}) of
+ true ->
+ {ok, #state{host=Host,
+ port=Port,
+ ssl=Ssl,
+ max_conn=MaxConn,
+ timeout=ConnTimeout,
+ clients=ets:new(clients, [set, private])}};
+ false ->
+ ignore
+ end.
+
+handle_call({checkout,Pid}, _From, S = #state{free=[], max_conn=Max, clients=Tid}) ->
+ Size = ets:info(Tid, size),
+ case Max > Size of
+ true ->
+ Ref = erlang:monitor(process, Pid),
+ ets:insert(Tid, {Pid, Ref}),
+ {reply, no_socket, S};
+ false ->
+ {reply, retry_later, S}
+ end;
+handle_call({checkout,Pid}, _From, S = #state{free=[{Taken,Timer}|Free], clients=Tid, ssl=Ssl}) ->
+ lhttpc_sock:setopts(Taken, [{active,false}], Ssl),
+ case lhttpc_sock:controlling_process(Taken, Pid, Ssl) of
+ ok ->
+ cancel_timer(Timer, Taken),
+ add_client(Tid,Pid),
+ {reply, {ok, Taken}, S#state{free=Free}};
+ {error, badarg} ->
+ %% The caller died.
+ lhttpc_sock:setopts(Taken, [{active, once}], Ssl),
+ {noreply, S};
+ {error, _Reason} -> % socket is closed or something
+ cancel_timer(Timer, Taken),
+ handle_call({checkout,Pid}, _From, S#state{free=Free})
+ end;
+handle_call(_Msg, _From, S) ->
+ {noreply, S}.
+
+handle_cast({checkin, Pid}, S = #state{clients=Tid}) ->
+ remove_client(Tid, Pid),
+ noreply_maybe_shutdown(S);
+handle_cast({checkin, Pid, Socket}, S = #state{ssl=Ssl, clients=Tid, free=Free, timeout=T}) ->
+ remove_client(Tid, Pid),
+ %% the client cast function took care of giving us ownership
+ case lhttpc_sock:setopts(Socket, [{active, once}], Ssl) of
+ ok ->
+ Timer = start_timer(Socket,T),
+ {noreply, S#state{free=[{Socket,Timer}|Free]}};
+ {error, _E} -> % socket closed or failed
+ noreply_maybe_shutdown(S)
+ end;
+handle_cast(_Msg, State) ->
{noreply, State}.
-%% @hidden
--spec handle_info(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
+handle_info({'DOWN', _Ref, process, Pid, _Reason}, S=#state{clients=Tid}) ->
+ %% Client died
+ remove_client(Tid,Pid),
+ noreply_maybe_shutdown(S);
handle_info({tcp_closed, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({ssl_closed, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({timeout, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({tcp_error, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({ssl_error, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)};
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({tcp, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)}; % got garbage
+ noreply_maybe_shutdown(remove_socket(Socket,State));
handle_info({ssl, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)}; % got garbage
-handle_info(_, State) ->
+ noreply_maybe_shutdown(remove_socket(Socket,State));
+handle_info(timeout, State) ->
+ {stop, normal, State};
+handle_info(_Info, State) ->
{noreply, State}.
-%% @hidden
--spec terminate(any(), #httpc_man{}) -> ok.
-terminate(_, State) ->
- close_sockets(State#httpc_man.sockets, State#httpc_man.ssl).
-
-%% @hidden
--spec code_change(any(), #httpc_man{}, any()) -> #httpc_man{}.
-code_change(_, State, _) ->
- State.
-
-find_socket(Pid, ConnectOptions, ConnectTimeout, State) ->
- Host = State#httpc_man.host,
- Port = State#httpc_man.port,
- Ssl = State#httpc_man.ssl,
- case State#httpc_man.available_sockets of
- [Socket|Available] ->
- case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
- ok ->
- [{Socket,Timer}] = ets:lookup(State#httpc_man.sockets, Socket),
- cancel_timer(Timer, Socket),
- NewState = State#httpc_man{available_sockets = Available},
- {{ok, Socket}, NewState};
- {error, badarg} ->
- lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
- {{error, no_pid}, State};
- {error, _Reason} ->
- NewState = State#httpc_man{available_sockets = Available},
- find_socket(Pid, ConnectOptions, ConnectTimeout, remove_socket(Socket, NewState))
- end;
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, #state{host=H, port=P, ssl=Ssl, free=Free, clients=Tid}) ->
+ ets:delete(Tid),
+ ets:delete(?MODULE,{H,P,Ssl}),
+ [lhttpc_sock:close(Socket,Ssl) || {Socket, _TimerRef} <- Free],
+ ok.
+
+%%%%%%%%%%%%%%%
+%%% PRIVATE %%%
+%%%%%%%%%%%%%%%
+
+%% Potential race condition: if the lb shuts itself down after a while, it
+%% might happen between a read and the use of the pid. A busy load balancer
+%% should not have this problem.
+-spec find_lb(Name::{host(),port_number(),boolean()}, {max_connections(), connection_timeout()}) -> pid().
+find_lb(Name = {Host,Port,Ssl}, Args={MaxConn, ConnTimeout}) ->
+ case ets:lookup(?MODULE, Name) of
[] ->
- MaxSockets = State#httpc_man.max_connections,
- Size = ets:info(State#httpc_man.sockets, size),
- case MaxSockets > Size andalso Size =/= undefined of
- true ->
- SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
- case lhttpc_sock:connect(Host, Port, SocketOptions, ConnectTimeout, Ssl) of
- {ok, Socket} ->
- find_socket(Pid, ConnectOptions, ConnectTimeout, store_socket(Socket, State));
- {error, etimedout} ->
- {{error, sys_timeout}, State};
- {error, timeout} ->
- {{error, timeout}, State};
- {error, Reason} ->
- {{error, Reason}, State}
- end;
- false ->
- {{error, retry_later}, State}
- end
+ case supervisor:start_child(lhttpc_sup, [Host,Port,Ssl,MaxConn,ConnTimeout]) of
+ {ok, undefined} -> find_lb(Name,Args);
+ {ok, Pid} -> Pid
+ end;
+ [{_Name, Pid}] ->
+ case is_process_alive(Pid) of % lb died, stale entry
+ true -> Pid;
+ false ->
+ ets:delete(?MODULE, Name),
+ find_lb(Name,Args)
+ end
end.
-remove_socket(Socket, State) ->
- case ets:lookup(State#httpc_man.sockets, Socket) of
- [{Socket,Timer}] ->
- cancel_timer(Timer, Socket),
- lhttpc_sock:close(Socket, State#httpc_man.ssl),
- ets:delete(State#httpc_man.sockets, Socket);
- [] ->
- ok
- end,
- State.
-
-store_socket(Socket, State) ->
- Timeout = State#httpc_man.connection_timeout,
- Timer = case Timeout of
- infinity -> undefined;
- _Other -> erlang:send_after(Timeout, self(), {timeout, Socket})
- end,
- lhttpc_sock:setopts(Socket, [{active, once}], State#httpc_man.ssl),
- ets:insert(State#httpc_man.sockets, {Socket, Timer}),
- State#httpc_man{available_sockets = [Socket|State#httpc_man.available_sockets]}.
-
-close_sockets(Sockets, Ssl) ->
- ets:foldl(
- fun({Socket, undefined}, _) ->
- lhttpc_sock:close(Socket, Ssl);
- ({Socket, Timer}, _) ->
- erlang:cancel_timer(Timer),
- lhttpc_sock:close(Socket, Ssl)
- end, ok, Sockets
- ).
-
-cancel_timer(undefined, _Socket) ->
- ok;
-cancel_timer(Timer, Socket) ->
- case erlang:cancel_timer(Timer) of
+%% Version of the function to be used when we don't want to start a load balancer
+%% if none is found
+-spec find_lb(Name::{host(),port_number(),boolean()}) -> {error,undefined} | {ok,pid()}.
+find_lb(Name={_Host,_Port,_Ssl}) ->
+ case ets:lookup(?MODULE, Name) of
+ [] -> {error, undefined};
+ [{_Name, Pid}] ->
+ case erlang:is_process_alive(Pid) of
+ true -> {ok, Pid};
+ false -> % lb died, stale entry
+ ets:delete(?MODULE,Name),
+ {error, undefined}
+ end
+ end.
+
+-spec add_client(ets:tid(), pid()) -> true.
+add_client(Tid, Pid) ->
+ Ref = erlang:monitor(process, Pid),
+ ets:insert(Tid, {Pid, Ref}).
+
+-spec remove_client(ets:tid(), pid()) -> true.
+remove_client(Tid, Pid) ->
+ case ets:lookup(Tid, Pid) of
+ [] -> ok; % client already removed
+ [{_Pid, Ref}] ->
+ erlang:demonitor(Ref, [flush]),
+ ets:delete(Tid, Pid)
+ end.
+
+-spec remove_socket(port(), #state{}) -> #state{}.
+remove_socket(Socket, S = #state{ssl=Ssl, free=Free}) ->
+ lhttpc_sock:close(Socket, Ssl),
+ S#state{free=drop_and_cancel(Socket,Free)}.
+
+-spec drop_and_cancel(port(), [{port(), reference()}]) -> [{port(), reference()}].
+drop_and_cancel(_, []) -> [];
+drop_and_cancel(Socket, [{Socket, TimerRef} | Rest]) ->
+ cancel_timer(TimerRef, Socket),
+ Rest;
+drop_and_cancel(Socket, [H|T]) ->
+ [H | drop_and_cancel(Socket, T)].
+
+-spec cancel_timer(reference(), port()) -> ok.
+cancel_timer(TimerRef, Socket) ->
+ case erlang:cancel_timer(TimerRef) of
false ->
receive
{timeout, Socket} -> ok
- after
- 0 -> ok
+ after 0 -> ok
end;
_ -> ok
end.
+
+-spec start_timer(port(), connection_timeout()) -> reference().
+start_timer(_, infinity) -> make_ref(); % dummy timer
+start_timer(Socket, Timeout) ->
+ erlang:send_after(Timeout, self(), {timeout,Socket}).
+
+noreply_maybe_shutdown(S=#state{clients=Tid, free=Free}) ->
+ case Free =:= [] andalso ets:info(Tid, size) =:= 0 of
+ true -> % we're done for
+ {noreply,S,?SHUTDOWN_DELAY};
+ false ->
+ {noreply, S}
+ end.
View
3  src/lhttpc_lib.erl
@@ -28,7 +28,6 @@
%%% @author Oscar Hellström <oscar@hellstrom.st>
%%% @doc
%%% This module implements various library functions used in lhttpc.
-%%% @end
-module(lhttpc_lib).
-export([
@@ -177,7 +176,7 @@ format_hdrs(Headers) ->
format_hdrs([{Hdr, Value} | T], Acc) ->
NewAcc = [
- maybe_atom_to_list(Hdr), ":", maybe_atom_to_list(Value), "\r\n" | Acc
+ maybe_atom_to_list(Hdr), ": ", maybe_atom_to_list(Value), "\r\n" | Acc
],
format_hdrs(T, NewAcc);
format_hdrs([], Acc) ->
View
125 src/lhttpc_manager.erl
@@ -1,125 +0,0 @@
-%%% ----------------------------------------------------------------------------
-%%% Copyright (c) 2009, Erlang Training and Consulting Ltd.
-%%% All rights reserved.
-%%%
-%%% Redistribution and use in source and binary forms, with or without
-%%% modification, are permitted provided that the following conditions are met:
-%%% * Redistributions of source code must retain the above copyright
-%%% notice, this list of conditions and the following disclaimer.
-%%% * Redistributions in binary form must reproduce the above copyright
-%%% notice, this list of conditions and the following disclaimer in the
-%%% documentation and/or other materials provided with the distribution.
-%%% * Neither the name of Erlang Training and Consulting Ltd. nor the
-%%% names of its contributors may be used to endorse or promote products
-%%% derived from this software without specific prior written permission.
-%%%
-%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS''
-%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-%%% ARE DISCLAIMED. IN NO EVENT SHALL Erlang Training and Consulting Ltd. BE
-%%% LIABLE SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-%%% BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-%%% WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
-%%% OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
-%%% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-%%% ----------------------------------------------------------------------------
-
-%%% @author Oscar Hellström <oscar@hellstrom.st>
-%%% @doc Connection manager for the HTTP client.
-%%% This gen_server is responsible for keeping track of persistent
-%%% connections to HTTP servers. The only interesting API is
-%%% `connection_count/0' and `connection_count/1'.
-%%% The gen_server is supposed to be started by a supervisor, which is
-%%% normally {@link lhttpc_sup}.
-%%% @end
--module(lhttpc_manager).
-
--export([
- start_link/0
- ]).
--export([
- init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- code_change/3,
- terminate/2
- ]).
-
--behaviour(gen_server).
-
--record(httpc_man, {
- destinations = dict:new()
- }).
-
-%% @spec () -> {ok, pid()}
-%% @doc Starts and link to the gen server.
-%% This is normally called by a supervisor.
-%% @end
--spec start_link() -> {ok, pid()} | {error, allready_started}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
-
-%% @hidden
--spec init(any()) -> {ok, #httpc_man{}}.
-init(_) ->
- process_flag(priority, high),
- {ok, #httpc_man{}}.
-
-%% @hidden
--spec handle_call(any(), any(), #httpc_man{}) ->
- {reply, any(), #httpc_man{}}.
-handle_call({lb, Host, Port, Ssl, MaxConnections, ConnectionTimeout}, _, State) ->
- {Reply, NewState} = find_lb({Host, Port, Ssl}, {MaxConnections, ConnectionTimeout}, State),
- {reply, Reply, NewState};
-handle_call(_, _, State) ->
- {reply, {error, unknown_request}, State}.
-
-%% @hidden
--spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
-handle_cast(_, State) ->
- {noreply, State}.
-
-%% @hidden
--spec handle_info(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
-handle_info({kill_client_after_timeout, ReqId, Pid, StreamTo}, State) ->
- case erlang:is_process_alive(Pid) of
- true ->
- exit(Pid, kill),
- StreamTo ! {response, ReqId, Pid, {error, timeout}};
- false -> ok
- end,
- {noreply, State};
-handle_info(_, State) ->
- {noreply, State}.
-
-%% @hidden
--spec terminate(any(), #httpc_man{}) -> ok.
-terminate(_, State) ->
- close_lbs(State#httpc_man.destinations).
-
-%% @hidden
--spec code_change(any(), #httpc_man{}, any()) -> #httpc_man{}.
-code_change(_, State, _) ->
- State.
-
-find_lb(Dest, Options, State) ->
- Dests = State#httpc_man.destinations,
- case dict:find(Dest, Dests) of
- {ok, Lb} ->
- {{ok, Lb}, State};
- error ->
- {ok, Pid} = lhttpc_lb:start_link([Dest, Options]),
- NewState = State#httpc_man{
- destinations = update_dest(Dest, Pid, Dests)
- },
- {{ok, Pid}, NewState}
- end.
-
-update_dest(Destination, Lb, Destinations) ->
- dict:store(Destination, Lb, Destinations).
-
-close_lbs(Destinations) ->
- lists:foreach(fun({_Dest, Lb}) ->
- gen_server:cast(Lb, {terminate})
- end, dict:to_list(Destinations)).
View
43 src/lhttpc_sock.erl
@@ -29,7 +29,6 @@
%%% @doc
%%% This module implements wrappers for socket operations.
%%% Makes it possible to have the same interface to ssl and tcp sockets.
-%%% @end
-module(lhttpc_sock).
-export([
@@ -60,9 +59,25 @@
-spec connect(host(), integer(), socket_options(), timeout(), boolean()) ->
{ok, socket()} | {error, atom()}.
connect(Host, Port, Options, Timeout, true) ->
- ssl:connect(Host, Port, Options, Timeout);
+ % Avoid port leak with potential race condition in case of timeout
+ Flag = process_flag(trap_exit, true),
+ Res = ssl:connect(Host, Port, Options, Timeout),
+ receive
+ {'EXIT',_Pid,timeout} -> exit(timeout)
+ after 0 ->
+ process_flag(trap_exit, Flag),
+ Res
+ end;
connect(Host, Port, Options, Timeout, false) ->
- gen_tcp:connect(Host, Port, Options, Timeout).
+ % Avoid port leak with potential race condition in case of timeout
+ Flag = process_flag(trap_exit, true),
+ Res = gen_tcp:connect(Host, Port, Options, Timeout),
+ receive
+ {'EXIT',_Pid,timeout} -> exit(timeout)
+ after 0 ->
+ process_flag(trap_exit, Flag),
+ Res
+ end.
%% @spec (Socket, SslFlag) -> {ok, Data} | {error, Reason}
%% Socket = socket()
@@ -80,7 +95,7 @@ connect(Host, Port, Options, Timeout, false) ->
recv(Socket, true) ->
ssl:recv(Socket, 0);
recv(Socket, false) ->
- inet_tcp:recv(Socket, 0).
+ gen_tcp:recv(Socket, 0).
%% @spec (Socket, Length, SslFlag) -> {ok, Data} | {error, Reason}
%% Socket = socket()
@@ -155,6 +170,22 @@ setopts(Socket, Options, false) ->
%% @end
-spec close(socket(), boolean()) -> ok | {error, atom()}.
close(Socket, true) ->
- ssl:close(Socket);
+ % Avoid port leak with potential race condition in case of timeout
+ Flag = process_flag(trap_exit, true),
+ Res = ssl:close(Socket),
+ receive
+ {'EXIT',_Pid,timeout} -> exit(timeout)
+ after 0 ->
+ process_flag(trap_exit, Flag),
+ Res
+ end;
close(Socket, false) ->
- gen_tcp:close(Socket).
+ % Avoid port leak with potential race condition in case of timeout
+ Flag = process_flag(trap_exit, true),
+ Res = gen_tcp:close(Socket),
+ receive
+ {'EXIT',_Pid,timeout} -> exit(timeout)
+ after 0 ->
+ process_flag(trap_exit, Flag),
+ Res
+ end.
View
25 src/lhttpc_sup.erl
@@ -32,7 +32,7 @@
-module(lhttpc_sup).
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_link/1]).
-export([init/1]).
-type child() :: {atom(), {atom(), atom(), list(any)},
@@ -46,12 +46,23 @@
%% @end
-spec start_link() -> {ok, pid()} | {error, atom()}.
start_link() ->
- supervisor:start_link(?MODULE, nil).
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_link(Args) ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
%% @hidden
-spec init(any()) -> {ok, {{atom(), integer(), integer()}, [child()]}}.
-init(_) ->
- LHTTPCManager = {lhttpc_manager, {lhttpc_manager, start_link, []},
- permanent, 10000, worker, [lhttpc_manager]
- },
- {ok, {{one_for_one, 10, 1}, [LHTTPCManager]}}.
+init(Opts) ->
+ init_ets(Opts),
+ {ok, {{simple_one_for_one, 10, 1}, [
+ {load_balancer,
+ {lhttpc_lb, start_link, []},
+ transient, 10000, worker, [lhttpc_lb]}
+ ]}}.
+
+init_ets(Opts) ->
+ ETSOpts = proplists:get_value(ets, Opts, []),
+ %% Only option supported so far -- others do not really make sense at this point
+ ReadConc = {read_concurrency, proplists:get_value(read_concurrency, ETSOpts, false)},
+ ets:new(lhttpc_lb, [named_table, set, public, ReadConc]).
View
29 test/lhttpc_tests.erl
@@ -134,6 +134,7 @@ tcp_test_() ->
?_test(bad_url()),
?_test(persistent_connection()),
?_test(request_timeout()),
+ ?_test(connection_timeout()),
?_test(suspended_manager()),
?_test(chunked_encoding()),
?_test(partial_upload_identity()),
@@ -151,7 +152,8 @@ tcp_test_() ->
?_test(partial_download_smallish_chunks()),
?_test(partial_download_slow_chunks()),
?_test(close_connection()),
- ?_test(message_queue())
+ ?_test(message_queue()),
+ ?_test(connection_count()) % just check that it's 0 (last)
]}
}.
@@ -160,7 +162,8 @@ ssl_test_() ->
{setup, fun start_app/0, fun stop_app/1, [
?_test(ssl_get()),
?_test(ssl_post()),
- ?_test(ssl_chunked())
+ ?_test(ssl_chunked()),
+ ?_test(connection_count()) % just check that it's 0 (last)
]}
}.
@@ -376,6 +379,18 @@ request_timeout() ->
URL = url(Port, "/slow"),
?assertEqual({error, timeout}, lhttpc:request(URL, get, [], 50)).
+connection_timeout() ->
+ Port = start(gen_tcp, [fun simple_response/5, fun simple_response/5]),
+ URL = url(Port, "/close_conn"),
+ lhttpc_manager:update_connection_timeout(50), % very short keep alive
+ {ok, Response} = lhttpc:request(URL, get, [], 100),
+ ?assertEqual({200, "OK"}, status(Response)),
+ ?assertEqual(<<?DEFAULT_STRING>>, body(Response)),
+ timer:sleep(100),
+ ?assertEqual(0,
+ lhttpc_manager:connection_count({"localhost", Port, false})),
+ lhttpc_manager:update_connection_timeout(300000). % set back
+
suspended_manager() ->
Port = start(gen_tcp, [fun simple_response/5, fun simple_response/5]),
URL = url(Port, "/persistent"),
@@ -386,6 +401,8 @@ suspended_manager() ->
true = erlang:suspend_process(Pid),
?assertEqual({error, timeout}, lhttpc:request(URL, get, [], 50)),
true = erlang:resume_process(Pid),
+ ?assertEqual(1,
+ lhttpc_manager:connection_count({"localhost", Port, false})),
{ok, SecondResponse} = lhttpc:request(URL, get, [], 50),
?assertEqual({200, "OK"}, status(SecondResponse)),
?assertEqual(<<?DEFAULT_STRING>>, body(SecondResponse)).
@@ -467,7 +484,7 @@ partial_upload_chunked() ->
?assertEqual(<<?DEFAULT_STRING>>, body(Response1)),
?assertEqual("This is chunky stuff!",
lhttpc_lib:header_value("x-test-orig-body", headers(Response1))),
- ?assertEqual(element(2, Trailer),
+ ?assertEqual(element(2, Trailer),
lhttpc_lib:header_value("x-test-orig-trailer-1", headers(Response1))),
% Make sure it works with no body part in the original request as well
Headers = [{"Transfer-Encoding", "chunked"}],
@@ -480,7 +497,7 @@ partial_upload_chunked() ->
?assertEqual(<<?DEFAULT_STRING>>, body(Response2)),
?assertEqual("This is chunky stuff!",
lhttpc_lib:header_value("x-test-orig-body", headers(Response2))),
- ?assertEqual(element(2, Trailer),
+ ?assertEqual(element(2, Trailer),
lhttpc_lib:header_value("x-test-orig-trailer-1", headers(Response2))).
partial_upload_chunked_no_trailer() ->
@@ -664,6 +681,10 @@ ssl_chunked() ->
?assertEqual("2", lhttpc_lib:header_value("Trailer-2",
headers(SecondResponse))).
+connection_count() ->
+ timer:sleep(50), % give the TCP stack time to deliver messages
+ ?assertEqual(0, lhttpc_manager:connection_count()).
+
invalid_options() ->
?assertError({bad_options, [{foo, bar}, bad_option]},
lhttpc:request("http://localhost/", get, [], <<>>, 1000,
Please sign in to comment.
Something went wrong with that request. Please try again.