From 41f5ba0dd626133d03be45c41a014b92c08fa312 Mon Sep 17 00:00:00 2001 From: Louis-Philippe Gauthier Date: Tue, 19 Jul 2011 09:43:26 -0400 Subject: [PATCH] Initial work #1 - adding load balancer --- src/lhttpc.erl | 62 ++++++------- src/lhttpc_client.erl | 73 +++++++-------- src/lhttpc_lb.erl | 198 +++++++++++++++++++++++++++++++++++++++++ src/lhttpc_manager.erl | 166 +++++----------------------------- src/lhttpc_sock.erl | 17 +++- 5 files changed, 297 insertions(+), 219 deletions(-) create mode 100644 src/lhttpc_lb.erl diff --git a/src/lhttpc.erl b/src/lhttpc.erl index b175ee2e..9896f60c 100644 --- a/src/lhttpc.erl +++ b/src/lhttpc.erl @@ -1,7 +1,7 @@ %%% ---------------------------------------------------------------------------- %%% Copyright (c) 2009, Erlang Training and Consulting Ltd. %%% All rights reserved. -%%% +%%% %%% Redistribution and use in source and binary forms, with or without %%% modification, are permitted provided that the following conditions are met: %%% * Redistributions of source code must retain the above copyright @@ -12,7 +12,7 @@ %%% * Neither the name of Erlang Training and Consulting Ltd. nor the %%% names of its contributors may be used to endorse or promote products %%% derived from this software without specific prior written permission. -%%% +%%% %%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS'' %%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE %%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE @@ -37,7 +37,7 @@ -export([start/2, stop/1]). -export([ send_body_part/2, - send_body_part/3, + send_body_part/3, send_trailers/2, send_trailers/3 ]). @@ -178,7 +178,7 @@ request(URL, Method, Hdrs, Body, Timeout) -> %% request(Host, Port, Path, Ssl, Method, Hdrs, Body, Timeout, Options). %% %% -%% `URL' is expected to be a valid URL: +%% `URL' is expected to be a valid URL: %% `scheme://host[:port][/path]'. %% @end %% @see request/9 @@ -229,13 +229,13 @@ request(URL, Method, Hdrs, Body, Timeout, Options) -> %% `Ssl' = `false'
%% `Path' = `"/foobar"'
%% `Path' must begin with a forward slash `/'. -%% +%% %% `Method' is either a string, stating the HTTP method exactly as in the %% protocol, i.e: `"POST"' or `"GET"'. It could also be an atom, which is %% then coverted to an uppercase (if it isn't already) string. %% %% `Hdrs' is a list of headers to send. Mandatory headers such as -%% `Host', `Content-Length' or `Transfer-Encoding' (for some requests) +%% `Host', `Content-Length' or `Transfer-Encoding' (for some requests) %% are added automatically. %% %% `Body' is the entity to send in the request. Please don't include entity @@ -254,7 +254,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) -> %% choose to give up earlier than the connect timeout, in which case the %% client will also give up. The default value is infinity, which means that %% it will either give up when the TCP stack gives up, or when the overall -%% request timeout is reached. +%% request timeout is reached. %% %% `{connect_options, Options}' specifies options to pass to the socket at %% connect time. This makes it possible to specify both SSL options and @@ -311,7 +311,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) -> %% pieces. Note however that the last piece might be smaller than `PartSize'. %% Size bounded entity bodies are handled the same way as unbounded ones if %% `PartSize' is `infinity'. If `PartSize' is integer it must be >= 0. -%% If `{partial_download, PartialDownloadOptions}' is specified the +%% If `{partial_download, PartialDownloadOptions}' is specified the %% `ResponseBody' will be a `pid()' unless the response has no body %% (for example in case of `HEAD' requests). In that case it will be be %% `undefined'. The functions {@link get_body_part/1} and @@ -325,18 +325,20 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) -> Args = [self(), Host, Port, Ssl, Path, Method, Hdrs, Body, Options], Pid = spawn_link(lhttpc_client, request, Args), receive - {response, Pid, R} -> - R; - {exit, Pid, Reason} -> - % We would rather want to exit here, instead of letting the - % linked client send us an exit signal, since this can be - % caught by the caller. - exit(Reason); - {'EXIT', Pid, Reason} -> - % This could happen if the process we're running in taps exits - % and the client process exits due to some exit signal being - % sent to it. Very unlikely though - exit(Reason) + X -> + X + % {response, Pid, R} -> + % R; + % {exit, Pid, Reason} -> + % % We would rather want to exit here, instead of letting the + % % linked client send us an exit signal, since this can be + % % caught by the caller. + % exit(Reason); + % {'EXIT', Pid, Reason} -> + % % This could happen if the process we're running in taps exits + % % and the client process exits due to some exit signal being + % % sent to it. Very unlikely though + % exit(Reason) after Timeout -> kill_client(Pid) end. @@ -353,7 +355,7 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) -> %% Would be the same as calling %% `send_body_part(UploadState, BodyPart, infinity)'. %% @end --spec send_body_part({pid(), window_size()}, iolist()) -> +-spec send_body_part({pid(), window_size()}, iolist()) -> {pid(), window_size()} | result(). send_body_part({Pid, Window}, IoList) -> send_body_part({Pid, Window}, IoList, infinity). @@ -371,15 +373,15 @@ send_body_part({Pid, Window}, IoList) -> %% milliseconds. If there is no acknowledgement received during that time the %% the request is cancelled and `{error, timeout}' is returned. %% -%% As long as the window size is larger than 0 the function will return +%% As long as the window size is larger than 0 the function will return %% immediately after sending the body part to the request handling process. -%% +%% %% The `BodyPart' `http_eob' signals an end of the entity body, the request %% is considered sent and the response will be read from the socket. If %% there is no response within `Timeout' milliseconds, the request is %% canceled and `{error, timeout}' is returned. %% @end --spec send_body_part({pid(), window_size()}, iolist(), timeout()) -> +-spec send_body_part({pid(), window_size()}, iolist(), timeout()) -> {ok, {pid(), window_size()}} | result(). send_body_part({Pid, _Window}, http_eob, Timeout) when is_pid(Pid) -> Pid ! {body_part, self(), http_eob}, @@ -422,7 +424,7 @@ send_body_part({Pid, Window}, IoList, _Timeout) when Window > 0, is_pid(Pid) -> %% @doc Sends trailers to an ongoing request when `{partial_upload, %% WindowSize}' is used and no `Content-Length' was specified. The default %% timout `infinity' will be used. Plase note that after this the request is -%% considered complete and the response will be read from the socket. +%% considered complete and the response will be read from the socket. %% Would be the same as calling %% `send_trailers(UploadState, BodyPart, infinity)'. %% @end @@ -450,7 +452,7 @@ send_trailers({Pid, Window}, Trailers) -> %% `Timeout' milliseconds the request is canceled and `{error, timeout}' is %% returned. %% @end --spec send_trailers({pid(), window_size()}, [{string() | string()}], +-spec send_trailers({pid(), window_size()}, [{string() | string()}], timeout()) -> result(). send_trailers({Pid, _Window}, Trailers, Timeout) when is_list(Trailers), is_pid(Pid) -> @@ -465,7 +467,7 @@ send_trailers({Pid, _Window}, Trailers, Timeout) %% Value = string() | binary() %% @doc Reads a body part from an ongoing response when %% `{partial_download, PartialDownloadOptions}' is used. The default timeout, -%% `infinity' will be used. +%% `infinity' will be used. %% Would be the same as calling %% `get_body_part(HTTPClient, infinity)'. %% @end @@ -482,11 +484,11 @@ get_body_part(Pid) -> %% Value = string() | binary() %% @doc Reads a body part from an ongoing response when %% `{partial_download, PartialDownloadOptions}' is used. -%% `Timeout' is the timeout for reading the next body part in milliseconds. +%% `Timeout' is the timeout for reading the next body part in milliseconds. %% `http_eob' marks the end of the body. If there were Trailers in the -%% response those are returned with `http_eob' as well. +%% response those are returned with `http_eob' as well. %% @end --spec get_body_part(pid(), timeout()) -> +-spec get_body_part(pid(), timeout()) -> {ok, binary()} | {ok, {http_eob, headers()}}. get_body_part(Pid, Timeout) -> receive diff --git a/src/lhttpc_client.erl b/src/lhttpc_client.erl index d93bfab3..ace08a65 100644 --- a/src/lhttpc_client.erl +++ b/src/lhttpc_client.erl @@ -45,6 +45,7 @@ method :: string(), request :: iolist(), request_headers :: headers(), + load_balancer:: pid(), socket, connect_timeout = infinity :: timeout(), connect_options = [] :: [any()], @@ -102,14 +103,12 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) -> PartialUpload = proplists:is_defined(partial_upload, Options), PartialDownload = proplists:is_defined(partial_download, Options), PartialDownloadOptions = proplists:get_value(partial_download, Options, []), + ConnectOptions = proplists:get_value(connect_options, Options, []), NormalizedMethod = lhttpc_lib:normalize_method(Method), {ChunkedUpload, Request} = lhttpc_lib:format_request(Path, NormalizedMethod, Hdrs, Host, Port, Body, PartialUpload), - SocketRequest = {socket, self(), Host, Port, Ssl}, - Socket = case gen_server:call(lhttpc_manager, SocketRequest, infinity) of - {ok, S} -> S; % Re-using HTTP/1.1 connections - no_socket -> undefined % Opening a new HTTP/1.1 connection - end, + LbRequest = {lb, Host, Port, Ssl}, + {ok, Lb} = gen_server:call(lhttpc_manager, LbRequest, infinity), State = #client_state{ host = Host, port = Port, @@ -118,10 +117,10 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) -> request = Request, requester = From, request_headers = Hdrs, - socket = Socket, + load_balancer = Lb, connect_timeout = proplists:get_value(connect_timeout, Options, infinity), - connect_options = proplists:get_value(connect_options, Options, []), + connect_options = ConnectOptions, attempts = 1 + proplists:get_value(send_retry, Options, 1), partial_upload = PartialUpload, upload_window = UploadWindowSize, @@ -136,19 +135,9 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) -> {R, undefined} -> {ok, R}; {R, NewSocket} -> - % The socket we ended up doing the request over is returned - % here, it might be the same as Socket, but we don't know. - % I've noticed that we don't want to give send sockets that we - % can't change the controlling process for to the manager. This - % really shouldn't fail, but it could do if: - % * The socket was closed remotely already - % * Due to an error in this module (returning dead sockets for - % instance) - ManagerPid = whereis(lhttpc_manager), - case lhttpc_sock:controlling_process(NewSocket, ManagerPid, Ssl) of + case lhttpc_sock:controlling_process(NewSocket, Lb, Ssl) of ok -> - gen_server:cast(lhttpc_manager, - {done, Host, Port, Ssl, NewSocket}); + gen_server:cast(Lb, {store, NewSocket}); _ -> ok end, @@ -157,45 +146,37 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) -> {response, self(), Response}. send_request(#client_state{attempts = 0}) -> - % Don't try again if the number of allowed attempts is 0. throw(connection_closed); send_request(#client_state{socket = undefined} = State) -> - Host = State#client_state.host, - Port = State#client_state.port, - Ssl = State#client_state.ssl, - Timeout = State#client_state.connect_timeout, ConnectOptions = State#client_state.connect_options, - SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions], - case lhttpc_sock:connect(Host, Port, SocketOptions, Timeout, Ssl) of + Lb = State#client_state.load_balancer, + SocketRequest = {socket, self(), ConnectOptions}, + case gen_server:call(Lb, SocketRequest, infinity) of {ok, Socket} -> send_request(State#client_state{socket = Socket}); - {error, etimedout} -> - % TCP stack decided to give up - throw(connect_timeout); - {error, timeout} -> - throw(connect_timeout); {error, Reason} -> - erlang:error(Reason) + throw(Reason) end; send_request(State) -> + Lb = State#client_state.load_balancer, Socket = State#client_state.socket, Ssl = State#client_state.ssl, Request = State#client_state.request, case lhttpc_sock:send(Socket, Request, Ssl) of ok -> if - State#client_state.partial_upload -> partial_upload(State); + State#client_state.partial_upload -> partial_upload(State); not State#client_state.partial_upload -> read_response(State) end; {error, closed} -> - lhttpc_sock:close(Socket, Ssl), + gen_server:cast(Lb, {remove, Socket}), NewState = State#client_state{ socket = undefined, attempts = State#client_state.attempts - 1 }, send_request(NewState); {error, Reason} -> - lhttpc_sock:close(Socket, Ssl), + gen_server:cast(Lb, {remove, Socket}), erlang:error(Reason) end. @@ -242,8 +223,8 @@ encode_body_part(#client_state{chunked_upload = false}, Data) -> check_send_result(_State, ok) -> ok; -check_send_result(#client_state{socket = Sock, ssl = Ssl}, {error, Reason}) -> - lhttpc_sock:close(Sock, Ssl), +check_send_result(#client_state{socket = Socket, load_balancer = Lb}, {error, Reason}) -> + gen_server:cast(Lb, {remove, Socket}), throw(Reason). read_response(#client_state{socket = Socket, ssl = Ssl} = State) -> @@ -251,6 +232,7 @@ read_response(#client_state{socket = Socket, ssl = Ssl} = State) -> read_response(State, nil, {nil, nil}, []). read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) -> + Lb = State#client_state.load_balancer, Socket = State#client_state.socket, Ssl = State#client_state.ssl, case lhttpc_sock:recv(Socket, Ssl) of @@ -281,14 +263,19 @@ read_response(State, Vsn, {StatusCode, _} = Status, Hdrs) -> % the request on the wire or the server has some issues and is % closing connections without sending responses. % If this the first attempt to send the request, we will try again. - lhttpc_sock:close(Socket, Ssl), + gen_server:cast(Lb, {remove, Socket}), NewState = State#client_state{ socket = undefined, attempts = State#client_state.attempts - 1 }, send_request(NewState); - {error, Reason} -> - erlang:error(Reason) + {error, timeout} -> + gen_server:cast(Lb, {remove, Socket}), + NewState = State#client_state{ + socket = undefined, + attempts = 0 + }, + send_request(NewState) end. handle_response_body(#client_state{partial_download = false} = State, Vsn, @@ -408,7 +395,7 @@ read_body_part(#client_state{part_size = infinity} = State, _ContentLength) -> end; read_body_part(#client_state{part_size = PartSize} = State, ContentLength) when PartSize =< ContentLength -> - Socket = State#client_state.socket, + Socket = State#client_state.socket, Ssl = State#client_state.ssl, PartSize = State#client_state.part_size, case lhttpc_sock:recv(Socket, PartSize, Ssl) of @@ -419,7 +406,7 @@ read_body_part(#client_state{part_size = PartSize} = State, ContentLength) end; read_body_part(#client_state{part_size = PartSize} = State, ContentLength) when PartSize > ContentLength -> - Socket = State#client_state.socket, + Socket = State#client_state.socket, Ssl = State#client_state.ssl, case lhttpc_sock:recv(Socket, ContentLength, Ssl) of {ok, Data} -> @@ -630,6 +617,7 @@ maybe_close_socket(Socket, Ssl, {1, Minor}, ReqHdrs, RespHdrs) when Minor >= 1-> ServerConnection = ?CONNECTION_HDR(RespHdrs, "keep-alive"), if ClientConnection =:= "close"; ServerConnection =:= "close" -> + error_logger:error_report("close"), lhttpc_sock:close(Socket, Ssl), undefined; ClientConnection =/= "close", ServerConnection =/= "close" -> @@ -640,6 +628,7 @@ maybe_close_socket(Socket, Ssl, _, ReqHdrs, RespHdrs) -> ServerConnection = ?CONNECTION_HDR(RespHdrs, "close"), if ClientConnection =:= "close"; ServerConnection =/= "keep-alive" -> + error_logger:error_report("close"), lhttpc_sock:close(Socket, Ssl), undefined; ClientConnection =/= "close", ServerConnection =:= "keep-alive" -> diff --git a/src/lhttpc_lb.erl b/src/lhttpc_lb.erl new file mode 100644 index 00000000..32e7e6ed --- /dev/null +++ b/src/lhttpc_lb.erl @@ -0,0 +1,198 @@ +-module(lhttpc_lb). + +-export([ + start_link/1 + ]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2 + ]). + +-behaviour(gen_server). + +-record(httpc_man, { + host :: string(), + port = 80 :: integer(), + ssl = false :: true | false, + sockets = dict:new(), + idle_sockets = queue:new(), + timeout = 1000000 :: non_neg_integer(), + max_sockets = 1 :: non_neg_integer() + }). + +%% @spec () -> {ok, pid()} +%% @doc Starts and link to the gen server. +%% This is normally called by a supervisor. +%% @end +-spec start_link(any()) -> {ok, pid()}. +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +%% @hidden +-spec init(any()) -> {ok, #httpc_man{}}. +init({Host, Port, Ssl}) -> + process_flag(priority, high), + {ok, Timeout} = application:get_env(lhttpc, connection_timeout), + State = #httpc_man{ + host = Host, + port = Port, + ssl = Ssl, + timeout = Timeout + }, + {ok, State}. + +%% @hidden +-spec handle_call(any(), any(), #httpc_man{}) -> + {reply, any(), #httpc_man{}}. +handle_call({socket, Pid, ConnectOptions}, _, State) -> + error_logger:error_report(find), + {Reply, NewState} = find_socket(Pid, ConnectOptions, State), + {reply, Reply, NewState}; +handle_call(_, _, State) -> + {reply, {error, unknown_request}, State}. + +%% @hidden +-spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}. +handle_cast({remove, Socket}, State) -> + error_logger:error_report(remove), + NewState = remove_socket(Socket, State), + {noreply, NewState}; +handle_cast({store, Socket}, State) -> + error_logger:error_report(store), + NewState = store_socket(Socket, State), + {noreply, NewState}; +handle_cast({terminate}, State) -> + terminate(undefined, State), + {noreply, State}; +handle_cast({update_timeout, Milliseconds}, State) -> + {noreply, State#httpc_man{timeout = Milliseconds}}; +handle_cast(_, State) -> + {noreply, State}. + +%% @hidden +-spec handle_info(any(), #httpc_man{}) -> {noreply, #httpc_man{}}. +handle_info({tcp_closed, Socket}, State) -> + error_logger:error_report(tcp_closed), + {noreply, remove_socket(Socket, State)}; +handle_info({ssl_closed, Socket}, State) -> + error_logger:error_report(ssl_closed), + {noreply, remove_socket(Socket, State)}; +handle_info({timeout, Socket}, State) -> + error_logger:error_report(timeout), + {noreply, remove_socket(Socket, State)}; +handle_info({tcp_error, Socket, _}, State) -> + error_logger:error_report(tcp_error), + {noreply, remove_socket(Socket, State)}; +handle_info({ssl_error, Socket, _}, State) -> + error_logger:error_report(ssl_error), + {noreply, remove_socket(Socket, State)}; +handle_info({tcp, Socket, _}, State) -> + error_logger:error_report(tcp), + {noreply, remove_socket(Socket, State)}; % got garbage +handle_info({ssl, Socket, _}, State) -> + error_logger:error_report(ssl), + {noreply, remove_socket(Socket, State)}; % got garbage +handle_info(_, State) -> + error_logger:error_report(other), + {noreply, State}. + +%% @hidden +-spec terminate(any(), #httpc_man{}) -> ok. +terminate(_, State) -> + close_sockets(State#httpc_man.sockets, State#httpc_man.ssl). + +%% @hidden +-spec code_change(any(), #httpc_man{}, any()) -> #httpc_man{}. +code_change(_, State, _) -> + State. + +find_socket(Pid, ConnectOptions, State) -> + Host = State#httpc_man.host, + Port = State#httpc_man.port, + Ssl = State#httpc_man.ssl, + Q1 = State#httpc_man.idle_sockets, + case queue:out(Q1) of + {{value, Socket}, Q2} -> + lhttpc_sock:setopts(Socket, [{active, false}], Ssl), + case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of + ok -> + Timer = dict:fetch(Socket, State#httpc_man.sockets), + cancel_timer(Timer, Socket), + NewState = State#httpc_man{ + idle_sockets = Q2 + }, + {{ok, Socket}, NewState}; + {error, badarg} -> + lhttpc_sock:setopts(Socket, [{active, once}], Ssl), + NewState = State#httpc_man{ + idle_sockets = queue:in(Socket, Q2) + }, + {{error, no_pid}, NewState}; + {error, _Reason} -> + NewState = State#httpc_man{ + idle_sockets = Q2 + }, + find_socket(Pid, ConnectOptions, remove_socket(Socket, NewState)) + end; + {empty, _Q2} -> + MaxSockets = State#httpc_man.max_sockets, + case MaxSockets > dict:size(State#httpc_man.sockets) of + true -> + Timeout = State#httpc_man.timeout, + SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions], + case lhttpc_sock:connect(Host, Port, SocketOptions, 1000, Ssl) of + {ok, Socket} -> + find_socket(Pid, ConnectOptions, store_socket(Socket, State)); + {error, etimedout} -> + {{error, sys_timeout}, State}; + {error, timeout} -> + {{error, timeout}, State}; + {error, Reason} -> + {{error, Reason}, State} + end; + false -> + {{error, retry_later}, State} + end + end. + +remove_socket(Socket, State) -> + case dict:find(Socket, State#httpc_man.sockets) of + {ok, Timer} -> + cancel_timer(Timer, Socket), + lhttpc_sock:close(Socket, State#httpc_man.ssl), + State#httpc_man{ + sockets = dict:erase(Socket, State#httpc_man.sockets) + }; + error -> + State + end. + +store_socket(Socket, State) -> + Timeout = State#httpc_man.timeout, + Timer = erlang:send_after(Timeout, self(), {timeout, Socket}), + lhttpc_sock:setopts(Socket, [{active, once}], State#httpc_man.ssl), + State#httpc_man{ + idle_sockets = queue:in(Socket, State#httpc_man.idle_sockets), + sockets = dict:store(Socket, Timer, State#httpc_man.sockets) + }. + +close_sockets(Sockets, Ssl) -> + lists:foreach(fun({Socket, Timer}) -> + lhttpc_sock:close(Socket, Ssl), + erlang:cancel_timer(Timer) + end, dict:to_list(Sockets)). + +cancel_timer(Timer, Socket) -> + case erlang:cancel_timer(Timer) of + false -> + receive + {timeout, Socket} -> ok + after + 0 -> ok + end; + _ -> ok + end. diff --git a/src/lhttpc_manager.erl b/src/lhttpc_manager.erl index d8bb111d..76920cf6 100644 --- a/src/lhttpc_manager.erl +++ b/src/lhttpc_manager.erl @@ -1,7 +1,7 @@ %%% ---------------------------------------------------------------------------- %%% Copyright (c) 2009, Erlang Training and Consulting Ltd. %%% All rights reserved. -%%% +%%% %%% Redistribution and use in source and binary forms, with or without %%% modification, are permitted provided that the following conditions are met: %%% * Redistributions of source code must retain the above copyright @@ -12,7 +12,7 @@ %%% * Neither the name of Erlang Training and Consulting Ltd. nor the %%% names of its contributors may be used to endorse or promote products %%% derived from this software without specific prior written permission. -%%% +%%% %%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS'' %%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE %%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE @@ -36,10 +36,7 @@ -module(lhttpc_manager). -export([ - start_link/0, - connection_count/0, - connection_count/1, - update_connection_timeout/1 + start_link/0 ]). -export([ init/1, @@ -53,45 +50,9 @@ -behaviour(gen_server). -record(httpc_man, { - destinations = dict:new(), - sockets = dict:new(), - timeout = 300000 :: non_neg_integer() + destinations = dict:new() }). -%% @spec () -> Count -%% Count = integer() -%% @doc Returns the total number of active connections maintained by the -%% httpc manager. -%% @end --spec connection_count() -> non_neg_integer(). -connection_count() -> - gen_server:call(?MODULE, connection_count). - -%% @spec (Destination) -> Count -%% Destination = {Host, Port, Ssl} -%% Host = string() -%% Port = integer() -%% Ssl = boolean() -%% Count = integer() -%% @doc Returns the number of active connections to the specific -%% `Destination' maintained by the httpc manager. -%% @end --spec connection_count({string(), pos_integer(), boolean()}) -> - non_neg_integer(). -connection_count({Host, Port, Ssl}) -> - Destination = {string:to_lower(Host), Port, Ssl}, - gen_server:call(?MODULE, {connection_count, Destination}). - -%% @spec (Timeout) -> ok -%% Timeout = integer() -%% @doc Updates the timeout for persistent connections. -%% This will only affect future sockets handed to the manager. The sockets -%% already managed will keep their timers. -%% @end --spec update_connection_timeout(non_neg_integer()) -> ok. -update_connection_timeout(Milliseconds) -> - gen_server:cast(?MODULE, {update_timeout, Milliseconds}). - %% @spec () -> {ok, pid()} %% @doc Starts and link to the gen server. %% This is normally called by a supervisor. @@ -104,137 +65,54 @@ start_link() -> -spec init(any()) -> {ok, #httpc_man{}}. init(_) -> process_flag(priority, high), - {ok, Timeout} = application:get_env(lhttpc, connection_timeout), - {ok, #httpc_man{timeout = Timeout}}. + {ok, #httpc_man{}}. %% @hidden -spec handle_call(any(), any(), #httpc_man{}) -> {reply, any(), #httpc_man{}}. -handle_call({socket, Pid, Host, Port, Ssl}, _, State) -> - {Reply, NewState} = find_socket({Host, Port, Ssl}, Pid, State), +handle_call({lb, Host, Port, Ssl}, _, State) -> + {Reply, NewState} = find_lb({Host, Port, Ssl}, State), {reply, Reply, NewState}; -handle_call(connection_count, _, State) -> - {reply, dict:size(State#httpc_man.sockets), State}; -handle_call({connection_count, Destination}, _, State) -> - Count = case dict:find(Destination, State#httpc_man.destinations) of - {ok, Sockets} -> length(Sockets); - error -> 0 - end, - {reply, Count, State}; handle_call(_, _, State) -> {reply, {error, unknown_request}, State}. %% @hidden -spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}. -handle_cast({done, Host, Port, Ssl, Socket}, State) -> - NewState = store_socket({Host, Port, Ssl}, Socket, State), - {noreply, NewState}; -handle_cast({update_timeout, Milliseconds}, State) -> - {noreply, State#httpc_man{timeout = Milliseconds}}; handle_cast(_, State) -> {noreply, State}. %% @hidden -spec handle_info(any(), #httpc_man{}) -> {noreply, #httpc_man{}}. -handle_info({tcp_closed, Socket}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({ssl_closed, Socket}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({timeout, Socket}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({tcp_error, Socket, _}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({ssl_error, Socket, _}, State) -> - {noreply, remove_socket(Socket, State)}; -handle_info({tcp, Socket, _}, State) -> - {noreply, remove_socket(Socket, State)}; % got garbage -handle_info({ssl, Socket, _}, State) -> - {noreply, remove_socket(Socket, State)}; % got garbage handle_info(_, State) -> {noreply, State}. %% @hidden -spec terminate(any(), #httpc_man{}) -> ok. terminate(_, State) -> - close_sockets(State#httpc_man.sockets). + close_lbs(State#httpc_man.destinations). %% @hidden -spec code_change(any(), #httpc_man{}, any()) -> #httpc_man{}. code_change(_, State, _) -> State. -find_socket({_, _, Ssl} = Dest, Pid, State) -> +find_lb(Dest, State) -> Dests = State#httpc_man.destinations, case dict:find(Dest, Dests) of - {ok, [Socket | Sockets]} -> - lhttpc_sock:setopts(Socket, [{active, false}], Ssl), - case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of - ok -> - {_, Timer} = dict:fetch(Socket, State#httpc_man.sockets), - cancel_timer(Timer, Sockets), - NewState = State#httpc_man{ - destinations = update_dest(Dest, Sockets, Dests), - sockets = dict:erase(Socket, State#httpc_man.sockets) - }, - {{ok, Socket}, NewState}; - {error, badarg} -> % Pid has timed out, reuse for someone else - lhttpc_sock:setopts(Socket, [{active, once}], Ssl), - {no_socket, State}; - _ -> % something wrong with the socket; remove it, try again - find_socket(Dest, Pid, remove_socket(Socket, State)) - end; - error -> - {no_socket, State} - end. - -remove_socket(Socket, State) -> - Dests = State#httpc_man.destinations, - case dict:find(Socket, State#httpc_man.sockets) of - {ok, {{_, _, Ssl} = Dest, Timer}} -> - cancel_timer(Timer, Socket), - lhttpc_sock:close(Socket, Ssl), - Sockets = lists:delete(Socket, dict:fetch(Dest, Dests)), - State#httpc_man{ - destinations = update_dest(Dest, Sockets, Dests), - sockets = dict:erase(Socket, State#httpc_man.sockets) - }; + {ok, Lb} -> + {{ok, Lb}, State}; error -> - State + {ok, Pid} = lhttpc_lb:start_link(Dest), + NewState = State#httpc_man{ + destinations = update_dest(Dest, Pid, Dests) + }, + {{ok, Pid}, NewState} end. -store_socket({_, _, Ssl} = Dest, Socket, State) -> - Timeout = State#httpc_man.timeout, - Timer = erlang:send_after(Timeout, self(), {timeout, Socket}), - % the socket might be closed from the other side - lhttpc_sock:setopts(Socket, [{active, once}], Ssl), - Dests = State#httpc_man.destinations, - Sockets = case dict:find(Dest, Dests) of - {ok, S} -> [Socket | S]; - error -> [Socket] - end, - State#httpc_man{ - destinations = dict:store(Dest, Sockets, Dests), - sockets = dict:store(Socket, {Dest, Timer}, State#httpc_man.sockets) - }. +update_dest(Destination, Lb, Destinations) -> + dict:store(Destination, Lb, Destinations). -update_dest(Destination, [], Destinations) -> - dict:erase(Destination, Destinations); -update_dest(Destination, Sockets, Destinations) -> - dict:store(Destination, Sockets, Destinations). - -close_sockets(Sockets) -> - lists:foreach(fun({Socket, {{_, _, Ssl}, Timer}}) -> - lhttpc_sock:close(Socket, Ssl), - erlang:cancel_timer(Timer) - end, dict:to_list(Sockets)). - -cancel_timer(Timer, Socket) -> - case erlang:cancel_timer(Timer) of - false -> - receive - {timeout, Socket} -> ok - after - 0 -> ok - end; - _ -> ok - end. +close_lbs(Destinations) -> + lists:foreach(fun({_Dest, Lb}) -> + gen_server:cast(Lb, {terminate}) + end, dict:to_list(Destinations)). \ No newline at end of file diff --git a/src/lhttpc_sock.erl b/src/lhttpc_sock.erl index b742fafa..043c750a 100644 --- a/src/lhttpc_sock.erl +++ b/src/lhttpc_sock.erl @@ -1,7 +1,7 @@ %%% ---------------------------------------------------------------------------- %%% Copyright (c) 2009, Erlang Training and Consulting Ltd. %%% All rights reserved. -%%% +%%% %%% Redistribution and use in source and binary forms, with or without %%% modification, are permitted provided that the following conditions are met: %%% * Redistributions of source code must retain the above copyright @@ -12,7 +12,7 @@ %%% * Neither the name of Erlang Training and Consulting Ltd. nor the %%% names of its contributors may be used to endorse or promote products %%% derived from this software without specific prior written permission. -%%% +%%% %%% THIS SOFTWARE IS PROVIDED BY Erlang Training and Consulting Ltd. ''AS IS'' %%% AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE %%% IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE @@ -63,6 +63,7 @@ connect(Host, Port, Options, Timeout, true) -> ssl:connect(Host, Port, Options, Timeout); connect(Host, Port, Options, Timeout, false) -> + error_logger:error_report([connect, Host, Port, Options, Timeout]), gen_tcp:connect(Host, Port, Options, Timeout). %% @spec (Socket, SslFlag) -> {ok, Data} | {error, Reason} @@ -81,7 +82,11 @@ connect(Host, Port, Options, Timeout, false) -> recv(Socket, true) -> ssl:recv(Socket, 0); recv(Socket, false) -> - gen_tcp:recv(Socket, 0). + % error_logger:error_report([recv, Socket]), + % error_logger:error_report([inet_db, inet_db:lookup_socket(Socket)]), + X = inet_tcp:recv(Socket, 0), + error_logger:error_report(X), + X. %% @spec (Socket, Length, SslFlag) -> {ok, Data} | {error, Reason} %% Socket = socket() @@ -99,6 +104,7 @@ recv(_, 0, _) -> recv(Socket, Length, true) -> ssl:recv(Socket, Length); recv(Socket, Length, false) -> + error_logger:error_report([rcv, Socket, Length]), gen_tcp:recv(Socket, Length). %% @spec (Socket, Data, SslFlag) -> ok | {error, Reason} @@ -115,6 +121,7 @@ recv(Socket, Length, false) -> send(Socket, Request, true) -> ssl:send(Socket, Request); send(Socket, Request, false) -> + error_logger:error_report([send, Socket, Request]), gen_tcp:send(Socket, Request). %% @spec (Socket, Pid, SslFlag) -> ok | {error, Reason} @@ -130,6 +137,8 @@ send(Socket, Request, false) -> controlling_process(Socket, Pid, true) -> ssl:controlling_process(Socket, Pid); controlling_process(Socket, Pid, false) -> + error_logger:error_report([controlling_process, Socket, Pid]), + error_logger:error_report(process_info(Pid)), gen_tcp:controlling_process(Socket, Pid). %% @spec (Socket, Options, SslFlag) -> ok | {error, Reason} @@ -145,6 +154,7 @@ controlling_process(Socket, Pid, false) -> setopts(Socket, Options, true) -> ssl:setopts(Socket, Options); setopts(Socket, Options, false) -> + error_logger:error_report([setopts, Socket, Options]), inet:setopts(Socket, Options). %% @spec (Socket, SslFlag) -> ok | {error, Reason} @@ -158,4 +168,5 @@ setopts(Socket, Options, false) -> close(Socket, true) -> ssl:close(Socket); close(Socket, false) -> + error_logger:error_report([close, socket]), gen_tcp:close(Socket).