Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial work #1 - adding load balancer

  • Loading branch information...
commit 41f5ba0dd626133d03be45c41a014b92c08fa312 1 parent dac633e
@lpgauth authored
View
62 src/lhttpc.erl
@@ -1,7 +1,7 @@
%%% ----------------------------------------------------------------------------
%%% Copyright (c) 2009, Erlang Training and Consulting Ltd.
%%% All rights reserved.
-%%%
+%%%
%%% Redistribution and use in source and binary forms, with or without
%%% modification, are permitted provided that the following conditions are met:
%%% * Redistributions of source code must retain the above copyright
@@ -12,7 +12,7 @@
%%% * Neither the name of Erlang Training and Consulting Ltd. nor the
%%% names of its contributors may be used to endorse or promote products
%%% derived from this software without specific prior written permission.
-%%%
+%%%
%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS''
%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
@@ -37,7 +37,7 @@
-export([start/2, stop/1]).
-export([
send_body_part/2,
- send_body_part/3,
+ send_body_part/3,
send_trailers/2,
send_trailers/3
]).
@@ -178,7 +178,7 @@ request(URL, Method, Hdrs, Body, Timeout) ->
%% request(Host, Port, Path, Ssl, Method, Hdrs, Body, Timeout, Options).
%% </pre>
%%
-%% `URL' is expected to be a valid URL:
+%% `URL' is expected to be a valid URL:
%% `scheme://host[:port][/path]'.
%% @end
%% @see request/9
@@ -229,13 +229,13 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% `Ssl' = `false'<br/>
%% `Path' = `"/foobar"'<br/>
%% `Path' must begin with a forward slash `/'.
-%%
+%%
%% `Method' is either a string, stating the HTTP method exactly as in the
%% protocol, i.e: `"POST"' or `"GET"'. It could also be an atom, which is
%% then coverted to an uppercase (if it isn't already) string.
%%
%% `Hdrs' is a list of headers to send. Mandatory headers such as
-%% `Host', `Content-Length' or `Transfer-Encoding' (for some requests)
+%% `Host', `Content-Length' or `Transfer-Encoding' (for some requests)
%% are added automatically.
%%
%% `Body' is the entity to send in the request. Please don't include entity
@@ -254,7 +254,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% choose to give up earlier than the connect timeout, in which case the
%% client will also give up. The default value is infinity, which means that
%% it will either give up when the TCP stack gives up, or when the overall
-%% request timeout is reached.
+%% request timeout is reached.
%%
%% `{connect_options, Options}' specifies options to pass to the socket at
%% connect time. This makes it possible to specify both SSL options and
@@ -311,7 +311,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% pieces. Note however that the last piece might be smaller than `PartSize'.
%% Size bounded entity bodies are handled the same way as unbounded ones if
%% `PartSize' is `infinity'. If `PartSize' is integer it must be >= 0.
-%% If `{partial_download, PartialDownloadOptions}' is specified the
+%% If `{partial_download, PartialDownloadOptions}' is specified the
%% `ResponseBody' will be a `pid()' unless the response has no body
%% (for example in case of `HEAD' requests). In that case it will be be
%% `undefined'. The functions {@link get_body_part/1} and
@@ -325,18 +325,20 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
Args = [self(), Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
Pid = spawn_link(lhttpc_client, request, Args),
receive
- {response, Pid, R} ->
- R;
- {exit, Pid, Reason} ->
- % We would rather want to exit here, instead of letting the
- % linked client send us an exit signal, since this can be
- % caught by the caller.
- exit(Reason);
- {'EXIT', Pid, Reason} ->
- % This could happen if the process we're running in taps exits
- % and the client process exits due to some exit signal being
- % sent to it. Very unlikely though
- exit(Reason)
+ X ->
+ X
+ % {response, Pid, R} ->
+ % R;
+ % {exit, Pid, Reason} ->
+ % % We would rather want to exit here, instead of letting the
+ % % linked client send us an exit signal, since this can be
+ % % caught by the caller.
+ % exit(Reason);
+ % {'EXIT', Pid, Reason} ->
+ % % This could happen if the process we're running in taps exits
+ % % and the client process exits due to some exit signal being
+ % % sent to it. Very unlikely though
+ % exit(Reason)
after Timeout ->
kill_client(Pid)
end.
@@ -353,7 +355,7 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
%% Would be the same as calling
%% `send_body_part(UploadState, BodyPart, infinity)'.
%% @end
--spec send_body_part({pid(), window_size()}, iolist()) ->
+-spec send_body_part({pid(), window_size()}, iolist()) ->
{pid(), window_size()} | result().
send_body_part({Pid, Window}, IoList) ->
send_body_part({Pid, Window}, IoList, infinity).
@@ -371,15 +373,15 @@ send_body_part({Pid, Window}, IoList) ->
%% milliseconds. If there is no acknowledgement received during that time the
%% the request is cancelled and `{error, timeout}' is returned.
%%
-%% As long as the window size is larger than 0 the function will return
+%% As long as the window size is larger than 0 the function will return
%% immediately after sending the body part to the request handling process.
-%%
+%%
%% The `BodyPart' `http_eob' signals an end of the entity body, the request
%% is considered sent and the response will be read from the socket. If
%% there is no response within `Timeout' milliseconds, the request is
%% canceled and `{error, timeout}' is returned.
%% @end
--spec send_body_part({pid(), window_size()}, iolist(), timeout()) ->
+-spec send_body_part({pid(), window_size()}, iolist(), timeout()) ->
{ok, {pid(), window_size()}} | result().
send_body_part({Pid, _Window}, http_eob, Timeout) when is_pid(Pid) ->
Pid ! {body_part, self(), http_eob},
@@ -422,7 +424,7 @@ send_body_part({Pid, Window}, IoList, _Timeout) when Window > 0, is_pid(Pid) ->
%% @doc Sends trailers to an ongoing request when `{partial_upload,
%% WindowSize}' is used and no `Content-Length' was specified. The default
%% timout `infinity' will be used. Plase note that after this the request is
-%% considered complete and the response will be read from the socket.
+%% considered complete and the response will be read from the socket.
%% Would be the same as calling
%% `send_trailers(UploadState, BodyPart, infinity)'.
%% @end
@@ -450,7 +452,7 @@ send_trailers({Pid, Window}, Trailers) ->
%% `Timeout' milliseconds the request is canceled and `{error, timeout}' is
%% returned.
%% @end
--spec send_trailers({pid(), window_size()}, [{string() | string()}],
+-spec send_trailers({pid(), window_size()}, [{string() | string()}],
timeout()) -> result().
send_trailers({Pid, _Window}, Trailers, Timeout)
when is_list(Trailers), is_pid(Pid) ->
@@ -465,7 +467,7 @@ send_trailers({Pid, _Window}, Trailers, Timeout)
%% Value = string() | binary()
%% @doc Reads a body part from an ongoing response when
%% `{partial_download, PartialDownloadOptions}' is used. The default timeout,
-%% `infinity' will be used.
+%% `infinity' will be used.
%% Would be the same as calling
%% `get_body_part(HTTPClient, infinity)'.
%% @end
@@ -482,11 +484,11 @@ get_body_part(Pid) ->
%% Value = string() | binary()
%% @doc Reads a body part from an ongoing response when
%% `{partial_download, PartialDownloadOptions}' is used.
-%% `Timeout' is the timeout for reading the next body part in milliseconds.
+%% `Timeout' is the timeout for reading the next body part in milliseconds.
%% `http_eob' marks the end of the body. If there were Trailers in the
-%% response those are returned with `http_eob' as well.
+%% response those are returned with `http_eob' as well.
%% @end
--spec get_body_part(pid(), timeout()) ->
+-spec get_body_part(pid(), timeout()) ->
{ok, binary()} | {ok, {http_eob, headers()}}.
get_body_part(Pid, Timeout) ->
receive
View
73 src/lhttpc_client.erl
@@ -45,6 +45,7 @@
method :: string(),
request :: iolist(),
request_headers :: headers(),
+ load_balancer:: pid(),
socket,
connect_timeout = infinity :: timeout(),
connect_options = [] :: [any()],
@@ -102,14 +103,12 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
PartialUpload = proplists:is_defined(partial_upload, Options),
PartialDownload = proplists:is_defined(partial_download, Options),
PartialDownloadOptions = proplists:get_value(partial_download, Options, []),
+ ConnectOptions = proplists:get_value(connect_options, Options, []),
NormalizedMethod = lhttpc_lib:normalize_method(Method),
{ChunkedUpload, Request} = lhttpc_lib:format_request(Path, NormalizedMethod,
Hdrs, Host, Port, Body, PartialUpload),
- SocketRequest = {socket, self(), Host, Port, Ssl},
- Socket = case gen_server:call(lhttpc_manager, SocketRequest, infinity) of
- {ok, S} -> S; % Re-using HTTP/1.1 connections
- no_socket -> undefined % Opening a new HTTP/1.1 connection
- end,
+ LbRequest = {lb, Host, Port, Ssl},
+ {ok, Lb} = gen_server:call(lhttpc_manager, LbRequest, infinity),
State = #client_state{
host = Host,
port = Port,
@@ -118,10 +117,10 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
request = Request,
requester = From,
request_headers = Hdrs,
- socket = Socket,
+ load_balancer = Lb,
connect_timeout = proplists:get_value(connect_timeout, Options,
infinity),
- connect_options = proplists:get_value(connect_options, Options, []),
+ connect_options = ConnectOptions,
attempts = 1 + proplists:get_value(send_retry, Options, 1),
partial_upload = PartialUpload,
upload_window = UploadWindowSize,
@@ -136,19 +135,9 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
{R, undefined} ->
{ok, R};
{R, NewSocket} ->
- % The socket we ended up doing the request over is returned
- % here, it might be the same as Socket, but we don't know.
- % I've noticed that we don't want to give send sockets that we
- % can't change the controlling process for to the manager. This
- % really shouldn't fail, but it could do if:
- % * The socket was closed remotely already
- % * Due to an error in this module (returning dead sockets for
- % instance)
- ManagerPid = whereis(lhttpc_manager),
- case lhttpc_sock:controlling_process(NewSocket, ManagerPid, Ssl) of
+ case lhttpc_sock:controlling_process(NewSocket, Lb, Ssl) of
ok ->
- gen_server:cast(lhttpc_manager,
- {done, Host, Port, Ssl, NewSocket});
+ gen_server:cast(Lb, {store, NewSocket});
_ ->
ok
end,
@@ -157,45 +146,37 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
{response, self(), Response}.
send_request(#client_state{attempts = 0}) ->
- % Don't try again if the number of allowed attempts is 0.
throw(connection_closed);
send_request(#client_state{socket = undefined} = State) ->
- Host = State#client_state.host,
- Port = State#client_state.port,
- Ssl = State#client_state.ssl,
- Timeout = State#client_state.connect_timeout,
ConnectOptions = State#client_state.connect_options,
- SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
- case lhttpc_sock:connect(Host, Port, SocketOptions, Timeout, Ssl) of
+ Lb = State#client_state.load_balancer,
+ SocketRequest = {socket, self(), ConnectOptions},
+ case gen_server:call(Lb, SocketRequest, infinity) of
{ok, Socket} ->
send_request(State#client_state{socket = Socket});
- {error, etimedout} ->
- % TCP stack decided to give up
- throw(connect_timeout);
- {error, timeout} ->
- throw(connect_timeout);
{error, Reason} ->
- erlang:error(Reason)
+ throw(Reason)
end;
send_request(State) ->
+ Lb = State#client_state.load_balancer,
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
Request = State#client_state.request,
case lhttpc_sock:send(Socket, Request, Ssl) of
ok ->
if
- State#client_state.partial_upload -> partial_upload(State);
+ State#client_state.partial_upload -> partial_upload(State);
not State#client_state.partial_upload -> read_response(State)
end;
{error, closed} ->
- lhttpc_sock:close(Socket, Ssl),
+ gen_server:cast(Lb, {remove, Socket}),
NewState = State#client_state{
socket = undefined,
attempts = State#client_state.attempts - 1
},
send_request(NewState);
{error, Reason} ->
- lhttpc_sock:close(Socket, Ssl),
+ gen_server:cast(Lb, {remove, Socket}),
erlang:error(Reason)
end.
@@ -242,8 +223,8 @@ encode_body_part(#client_state{chunked_upload = false}, Data) ->
check_send_result(_State, ok) ->
ok;
-check_send_result(#client_state{socket = Sock, ssl = Ssl}, {error, Reason}) ->
- lhttpc_sock:close(Sock, Ssl),
+check_send_result(#client_state{socket = Socket, load_balancer = Lb}, {error, Reason}) ->
+ gen_server:cast(Lb, {remove, Socket}),
throw(Reason).
read_response(#client_state{socket = Socket, ssl = Ssl} = State) ->
@@ -251,6 +232,7 @@ read_response(#client_state{socket = Socket, ssl = Ssl} = State) ->
read_response(State, nil, {nil, nil}, []).
read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
+ Lb = State#client_state.load_balancer,
Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
case lhttpc_sock:recv(Socket, Ssl) of
@@ -281,14 +263,19 @@ read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) ->
% the request on the wire or the server has some issues and is
% closing connections without sending responses.
% If this the first attempt to send the request, we will try again.
- lhttpc_sock:close(Socket, Ssl),
+ gen_server:cast(Lb, {remove, Socket}),
NewState = State#client_state{
socket = undefined,
attempts = State#client_state.attempts - 1
},
send_request(NewState);
- {error, Reason} ->
- erlang:error(Reason)
+ {error, timeout} ->
+ gen_server:cast(Lb, {remove, Socket}),
+ NewState = State#client_state{
+ socket = undefined,
+ attempts = 0
+ },
+ send_request(NewState)
end.
handle_response_body(#client_state{partial_download = false} = State, Vsn,
@@ -408,7 +395,7 @@ read_body_part(#client_state{part_size = infinity} = State, _ContentLength) ->
end;
read_body_part(#client_state{part_size = PartSize} = State, ContentLength)
when PartSize =< ContentLength ->
- Socket = State#client_state.socket,
+ Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
PartSize = State#client_state.part_size,
case lhttpc_sock:recv(Socket, PartSize, Ssl) of
@@ -419,7 +406,7 @@ read_body_part(#client_state{part_size = PartSize} = State, ContentLength)
end;
read_body_part(#client_state{part_size = PartSize} = State, ContentLength)
when PartSize > ContentLength ->
- Socket = State#client_state.socket,
+ Socket = State#client_state.socket,
Ssl = State#client_state.ssl,
case lhttpc_sock:recv(Socket, ContentLength, Ssl) of
{ok, Data} ->
@@ -630,6 +617,7 @@ maybe_close_socket(Socket, Ssl, {1, Minor}, ReqHdrs, RespHdrs) when Minor >= 1->
ServerConnection = ?CONNECTION_HDR(RespHdrs, "keep-alive"),
if
ClientConnection =:= "close"; ServerConnection =:= "close" ->
+ error_logger:error_report("close"),
lhttpc_sock:close(Socket, Ssl),
undefined;
ClientConnection =/= "close", ServerConnection =/= "close" ->
@@ -640,6 +628,7 @@ maybe_close_socket(Socket, Ssl, _, ReqHdrs, RespHdrs) ->
ServerConnection = ?CONNECTION_HDR(RespHdrs, "close"),
if
ClientConnection =:= "close"; ServerConnection =/= "keep-alive" ->
+ error_logger:error_report("close"),
lhttpc_sock:close(Socket, Ssl),
undefined;
ClientConnection =/= "close", ServerConnection =:= "keep-alive" ->
View
198 src/lhttpc_lb.erl
@@ -0,0 +1,198 @@
+-module(lhttpc_lb).
+
+-export([
+ start_link/1
+ ]).
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2
+ ]).
+
+-behaviour(gen_server).
+
+-record(httpc_man, {
+ host :: string(),
+ port = 80 :: integer(),
+ ssl = false :: true | false,
+ sockets = dict:new(),
+ idle_sockets = queue:new(),
+ timeout = 1000000 :: non_neg_integer(),
+ max_sockets = 1 :: non_neg_integer()
+ }).
+
+%% @spec () -> {ok, pid()}
+%% @doc Starts and link to the gen server.
+%% This is normally called by a supervisor.
+%% @end
+-spec start_link(any()) -> {ok, pid()}.
+start_link(Args) ->
+ gen_server:start_link(?MODULE, Args, []).
+
+%% @hidden
+-spec init(any()) -> {ok, #httpc_man{}}.
+init({Host, Port, Ssl}) ->
+ process_flag(priority, high),
+ {ok, Timeout} = application:get_env(lhttpc, connection_timeout),
+ State = #httpc_man{
+ host = Host,
+ port = Port,
+ ssl = Ssl,
+ timeout = Timeout
+ },
+ {ok, State}.
+
+%% @hidden
+-spec handle_call(any(), any(), #httpc_man{}) ->
+ {reply, any(), #httpc_man{}}.
+handle_call({socket, Pid, ConnectOptions}, _, State) ->
+ error_logger:error_report(find),
+ {Reply, NewState} = find_socket(Pid, ConnectOptions, State),
+ {reply, Reply, NewState};
+handle_call(_, _, State) ->
+ {reply, {error, unknown_request}, State}.
+
+%% @hidden
+-spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
+handle_cast({remove, Socket}, State) ->
+ error_logger:error_report(remove),
+ NewState = remove_socket(Socket, State),
+ {noreply, NewState};
+handle_cast({store, Socket}, State) ->
+ error_logger:error_report(store),
+ NewState = store_socket(Socket, State),
+ {noreply, NewState};
+handle_cast({terminate}, State) ->
+ terminate(undefined, State),
+ {noreply, State};
+handle_cast({update_timeout, Milliseconds}, State) ->
+ {noreply, State#httpc_man{timeout = Milliseconds}};
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%% @hidden
+-spec handle_info(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
+handle_info({tcp_closed, Socket}, State) ->
+ error_logger:error_report(tcp_closed),
+ {noreply, remove_socket(Socket, State)};
+handle_info({ssl_closed, Socket}, State) ->
+ error_logger:error_report(ssl_closed),
+ {noreply, remove_socket(Socket, State)};
+handle_info({timeout, Socket}, State) ->
+ error_logger:error_report(timeout),
+ {noreply, remove_socket(Socket, State)};
+handle_info({tcp_error, Socket, _}, State) ->
+ error_logger:error_report(tcp_error),
+ {noreply, remove_socket(Socket, State)};
+handle_info({ssl_error, Socket, _}, State) ->
+ error_logger:error_report(ssl_error),
+ {noreply, remove_socket(Socket, State)};
+handle_info({tcp, Socket, _}, State) ->
+ error_logger:error_report(tcp),
+ {noreply, remove_socket(Socket, State)}; % got garbage
+handle_info({ssl, Socket, _}, State) ->
+ error_logger:error_report(ssl),
+ {noreply, remove_socket(Socket, State)}; % got garbage
+handle_info(_, State) ->
+ error_logger:error_report(other),
+ {noreply, State}.
+
+%% @hidden
+-spec terminate(any(), #httpc_man{}) -> ok.
+terminate(_, State) ->
+ close_sockets(State#httpc_man.sockets, State#httpc_man.ssl).
+
+%% @hidden
+-spec code_change(any(), #httpc_man{}, any()) -> #httpc_man{}.
+code_change(_, State, _) ->
+ State.
+
+find_socket(Pid, ConnectOptions, State) ->
+ Host = State#httpc_man.host,
+ Port = State#httpc_man.port,
+ Ssl = State#httpc_man.ssl,
+ Q1 = State#httpc_man.idle_sockets,
+ case queue:out(Q1) of
+ {{value, Socket}, Q2} ->
+ lhttpc_sock:setopts(Socket, [{active, false}], Ssl),
+ case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
+ ok ->
+ Timer = dict:fetch(Socket, State#httpc_man.sockets),
+ cancel_timer(Timer, Socket),
+ NewState = State#httpc_man{
+ idle_sockets = Q2
+ },
+ {{ok, Socket}, NewState};
+ {error, badarg} ->
+ lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
+ NewState = State#httpc_man{
+ idle_sockets = queue:in(Socket, Q2)
+ },
+ {{error, no_pid}, NewState};
+ {error, _Reason} ->
+ NewState = State#httpc_man{
+ idle_sockets = Q2
+ },
+ find_socket(Pid, ConnectOptions, remove_socket(Socket, NewState))
+ end;
+ {empty, _Q2} ->
+ MaxSockets = State#httpc_man.max_sockets,
+ case MaxSockets > dict:size(State#httpc_man.sockets) of
+ true ->
+ Timeout = State#httpc_man.timeout,
+ SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
+ case lhttpc_sock:connect(Host, Port, SocketOptions, 1000, Ssl) of
+ {ok, Socket} ->
+ find_socket(Pid, ConnectOptions, store_socket(Socket, State));
+ {error, etimedout} ->
+ {{error, sys_timeout}, State};
+ {error, timeout} ->
+ {{error, timeout}, State};
+ {error, Reason} ->
+ {{error, Reason}, State}
+ end;
+ false ->
+ {{error, retry_later}, State}
+ end
+ end.
+
+remove_socket(Socket, State) ->
+ case dict:find(Socket, State#httpc_man.sockets) of
+ {ok, Timer} ->
+ cancel_timer(Timer, Socket),
+ lhttpc_sock:close(Socket, State#httpc_man.ssl),
+ State#httpc_man{
+ sockets = dict:erase(Socket, State#httpc_man.sockets)
+ };
+ error ->
+ State
+ end.
+
+store_socket(Socket, State) ->
+ Timeout = State#httpc_man.timeout,
+ Timer = erlang:send_after(Timeout, self(), {timeout, Socket}),
+ lhttpc_sock:setopts(Socket, [{active, once}], State#httpc_man.ssl),
+ State#httpc_man{
+ idle_sockets = queue:in(Socket, State#httpc_man.idle_sockets),
+ sockets = dict:store(Socket, Timer, State#httpc_man.sockets)
+ }.
+
+close_sockets(Sockets, Ssl) ->
+ lists:foreach(fun({Socket, Timer}) ->
+ lhttpc_sock:close(Socket, Ssl),
+ erlang:cancel_timer(Timer)
+ end, dict:to_list(Sockets)).
+
+cancel_timer(Timer, Socket) ->
+ case erlang:cancel_timer(Timer) of
+ false ->
+ receive
+ {timeout, Socket} -> ok
+ after
+ 0 -> ok
+ end;
+ _ -> ok
+ end.
View
166 src/lhttpc_manager.erl
@@ -1,7 +1,7 @@
%%% ----------------------------------------------------------------------------
%%% Copyright (c) 2009, Erlang Training and Consulting Ltd.
%%% All rights reserved.
-%%%
+%%%
%%% Redistribution and use in source and binary forms, with or without
%%% modification, are permitted provided that the following conditions are met:
%%% * Redistributions of source code must retain the above copyright
@@ -12,7 +12,7 @@
%%% * Neither the name of Erlang Training and Consulting Ltd. nor the
%%% names of its contributors may be used to endorse or promote products
%%% derived from this software without specific prior written permission.
-%%%
+%%%
%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS''
%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
@@ -36,10 +36,7 @@
-module(lhttpc_manager).
-export([
- start_link/0,
- connection_count/0,
- connection_count/1,
- update_connection_timeout/1
+ start_link/0
]).
-export([
init/1,
@@ -53,45 +50,9 @@
-behaviour(gen_server).
-record(httpc_man, {
- destinations = dict:new(),
- sockets = dict:new(),
- timeout = 300000 :: non_neg_integer()
+ destinations = dict:new()
}).
-%% @spec () -> Count
-%% Count = integer()
-%% @doc Returns the total number of active connections maintained by the
-%% httpc manager.
-%% @end
--spec connection_count() -> non_neg_integer().
-connection_count() ->
- gen_server:call(?MODULE, connection_count).
-
-%% @spec (Destination) -> Count
-%% Destination = {Host, Port, Ssl}
-%% Host = string()
-%% Port = integer()
-%% Ssl = boolean()
-%% Count = integer()
-%% @doc Returns the number of active connections to the specific
-%% `Destination' maintained by the httpc manager.
-%% @end
--spec connection_count({string(), pos_integer(), boolean()}) ->
- non_neg_integer().
-connection_count({Host, Port, Ssl}) ->
- Destination = {string:to_lower(Host), Port, Ssl},
- gen_server:call(?MODULE, {connection_count, Destination}).
-
-%% @spec (Timeout) -> ok
-%% Timeout = integer()
-%% @doc Updates the timeout for persistent connections.
-%% This will only affect future sockets handed to the manager. The sockets
-%% already managed will keep their timers.
-%% @end
--spec update_connection_timeout(non_neg_integer()) -> ok.
-update_connection_timeout(Milliseconds) ->
- gen_server:cast(?MODULE, {update_timeout, Milliseconds}).
-
%% @spec () -> {ok, pid()}
%% @doc Starts and link to the gen server.
%% This is normally called by a supervisor.
@@ -104,137 +65,54 @@ start_link() ->
-spec init(any()) -> {ok, #httpc_man{}}.
init(_) ->
process_flag(priority, high),
- {ok, Timeout} = application:get_env(lhttpc, connection_timeout),
- {ok, #httpc_man{timeout = Timeout}}.
+ {ok, #httpc_man{}}.
%% @hidden
-spec handle_call(any(), any(), #httpc_man{}) ->
{reply, any(), #httpc_man{}}.
-handle_call({socket, Pid, Host, Port, Ssl}, _, State) ->
- {Reply, NewState} = find_socket({Host, Port, Ssl}, Pid, State),
+handle_call({lb, Host, Port, Ssl}, _, State) ->
+ {Reply, NewState} = find_lb({Host, Port, Ssl}, State),
{reply, Reply, NewState};
-handle_call(connection_count, _, State) ->
- {reply, dict:size(State#httpc_man.sockets), State};
-handle_call({connection_count, Destination}, _, State) ->
- Count = case dict:find(Destination, State#httpc_man.destinations) of
- {ok, Sockets} -> length(Sockets);
- error -> 0
- end,
- {reply, Count, State};
handle_call(_, _, State) ->
{reply, {error, unknown_request}, State}.
%% @hidden
-spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
-handle_cast({done, Host, Port, Ssl, Socket}, State) ->
- NewState = store_socket({Host, Port, Ssl}, Socket, State),
- {noreply, NewState};
-handle_cast({update_timeout, Milliseconds}, State) ->
- {noreply, State#httpc_man{timeout = Milliseconds}};
handle_cast(_, State) ->
{noreply, State}.
%% @hidden
-spec handle_info(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
-handle_info({tcp_closed, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
-handle_info({ssl_closed, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
-handle_info({timeout, Socket}, State) ->
- {noreply, remove_socket(Socket, State)};
-handle_info({tcp_error, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)};
-handle_info({ssl_error, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)};
-handle_info({tcp, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)}; % got garbage
-handle_info({ssl, Socket, _}, State) ->
- {noreply, remove_socket(Socket, State)}; % got garbage
handle_info(_, State) ->
{noreply, State}.
%% @hidden
-spec terminate(any(), #httpc_man{}) -> ok.
terminate(_, State) ->
- close_sockets(State#httpc_man.sockets).
+ close_lbs(State#httpc_man.destinations).
%% @hidden
-spec code_change(any(), #httpc_man{}, any()) -> #httpc_man{}.
code_change(_, State, _) ->
State.
-find_socket({_, _, Ssl} = Dest, Pid, State) ->
+find_lb(Dest, State) ->
Dests = State#httpc_man.destinations,
case dict:find(Dest, Dests) of
- {ok, [Socket | Sockets]} ->
- lhttpc_sock:setopts(Socket, [{active, false}], Ssl),
- case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
- ok ->
- {_, Timer} = dict:fetch(Socket, State#httpc_man.sockets),
- cancel_timer(Timer, Sockets),
- NewState = State#httpc_man{
- destinations = update_dest(Dest, Sockets, Dests),
- sockets = dict:erase(Socket, State#httpc_man.sockets)
- },
- {{ok, Socket}, NewState};
- {error, badarg} -> % Pid has timed out, reuse for someone else
- lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
- {no_socket, State};
- _ -> % something wrong with the socket; remove it, try again
- find_socket(Dest, Pid, remove_socket(Socket, State))
- end;
- error ->
- {no_socket, State}
- end.
-
-remove_socket(Socket, State) ->
- Dests = State#httpc_man.destinations,
- case dict:find(Socket, State#httpc_man.sockets) of
- {ok, {{_, _, Ssl} = Dest, Timer}} ->
- cancel_timer(Timer, Socket),
- lhttpc_sock:close(Socket, Ssl),
- Sockets = lists:delete(Socket, dict:fetch(Dest, Dests)),
- State#httpc_man{
- destinations = update_dest(Dest, Sockets, Dests),
- sockets = dict:erase(Socket, State#httpc_man.sockets)
- };
+ {ok, Lb} ->
+ {{ok, Lb}, State};
error ->
- State
+ {ok, Pid} = lhttpc_lb:start_link(Dest),
+ NewState = State#httpc_man{
+ destinations = update_dest(Dest, Pid, Dests)
+ },
+ {{ok, Pid}, NewState}
end.
-store_socket({_, _, Ssl} = Dest, Socket, State) ->
- Timeout = State#httpc_man.timeout,
- Timer = erlang:send_after(Timeout, self(), {timeout, Socket}),
- % the socket might be closed from the other side
- lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
- Dests = State#httpc_man.destinations,
- Sockets = case dict:find(Dest, Dests) of
- {ok, S} -> [Socket | S];
- error -> [Socket]
- end,
- State#httpc_man{
- destinations = dict:store(Dest, Sockets, Dests),
- sockets = dict:store(Socket, {Dest, Timer}, State#httpc_man.sockets)
- }.
+update_dest(Destination, Lb, Destinations) ->
+ dict:store(Destination, Lb, Destinations).
-update_dest(Destination, [], Destinations) ->
- dict:erase(Destination, Destinations);
-update_dest(Destination, Sockets, Destinations) ->
- dict:store(Destination, Sockets, Destinations).
-
-close_sockets(Sockets) ->
- lists:foreach(fun({Socket, {{_, _, Ssl}, Timer}}) ->
- lhttpc_sock:close(Socket, Ssl),
- erlang:cancel_timer(Timer)
- end, dict:to_list(Sockets)).
-
-cancel_timer(Timer, Socket) ->
- case erlang:cancel_timer(Timer) of
- false ->
- receive
- {timeout, Socket} -> ok
- after
- 0 -> ok
- end;
- _ -> ok
- end.
+close_lbs(Destinations) ->
+ lists:foreach(fun({_Dest, Lb}) ->
+ gen_server:cast(Lb, {terminate})
+ end, dict:to_list(Destinations)).
View
17 src/lhttpc_sock.erl
@@ -1,7 +1,7 @@
%%% ----------------------------------------------------------------------------
%%% Copyright (c) 2009, Erlang Training and Consulting Ltd.
%%% All rights reserved.
-%%%
+%%%
%%% Redistribution and use in source and binary forms, with or without
%%% modification, are permitted provided that the following conditions are met:
%%% * Redistributions of source code must retain the above copyright
@@ -12,7 +12,7 @@
%%% * Neither the name of Erlang Training and Consulting Ltd. nor the
%%% names of its contributors may be used to endorse or promote products
%%% derived from this software without specific prior written permission.
-%%%
+%%%
%%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS''
%%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
%%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
@@ -63,6 +63,7 @@
connect(Host, Port, Options, Timeout, true) ->
ssl:connect(Host, Port, Options, Timeout);
connect(Host, Port, Options, Timeout, false) ->
+ error_logger:error_report([connect, Host, Port, Options, Timeout]),
gen_tcp:connect(Host, Port, Options, Timeout).
%% @spec (Socket, SslFlag) -> {ok, Data} | {error, Reason}
@@ -81,7 +82,11 @@ connect(Host, Port, Options, Timeout, false) ->
recv(Socket, true) ->
ssl:recv(Socket, 0);
recv(Socket, false) ->
- gen_tcp:recv(Socket, 0).
+ % error_logger:error_report([recv, Socket]),
+ % error_logger:error_report([inet_db, inet_db:lookup_socket(Socket)]),
+ X = inet_tcp:recv(Socket, 0),
+ error_logger:error_report(X),
+ X.
%% @spec (Socket, Length, SslFlag) -> {ok, Data} | {error, Reason}
%% Socket = socket()
@@ -99,6 +104,7 @@ recv(_, 0, _) ->
recv(Socket, Length, true) ->
ssl:recv(Socket, Length);
recv(Socket, Length, false) ->
+ error_logger:error_report([rcv, Socket, Length]),
gen_tcp:recv(Socket, Length).
%% @spec (Socket, Data, SslFlag) -> ok | {error, Reason}
@@ -115,6 +121,7 @@ recv(Socket, Length, false) ->
send(Socket, Request, true) ->
ssl:send(Socket, Request);
send(Socket, Request, false) ->
+ error_logger:error_report([send, Socket, Request]),
gen_tcp:send(Socket, Request).
%% @spec (Socket, Pid, SslFlag) -> ok | {error, Reason}
@@ -130,6 +137,8 @@ send(Socket, Request, false) ->
controlling_process(Socket, Pid, true) ->
ssl:controlling_process(Socket, Pid);
controlling_process(Socket, Pid, false) ->
+ error_logger:error_report([controlling_process, Socket, Pid]),
+ error_logger:error_report(process_info(Pid)),
gen_tcp:controlling_process(Socket, Pid).
%% @spec (Socket, Options, SslFlag) -> ok | {error, Reason}
@@ -145,6 +154,7 @@ controlling_process(Socket, Pid, false) ->
setopts(Socket, Options, true) ->
ssl:setopts(Socket, Options);
setopts(Socket, Options, false) ->
+ error_logger:error_report([setopts, Socket, Options]),
inet:setopts(Socket, Options).
%% @spec (Socket, SslFlag) -> ok | {error, Reason}
@@ -158,4 +168,5 @@ setopts(Socket, Options, false) ->
close(Socket, true) ->
ssl:close(Socket);
close(Socket, false) ->
+ error_logger:error_report([close, socket]),
gen_tcp:close(Socket).
Please sign in to comment.
Something went wrong with that request. Please try again.