Browse files

Changing the lhttpc load balancer to use ETS

The current implementation uses a dict and a queue for common
socket operations when load-balancing. Over heavy load, the process
gets to be very slow. Plus, it set itself as a high priority process,
unbalancing the whole VM.

This switches the dict to an ETS table, and the queue to a stack
(list) in order to reduce operations. Moreover, the process will
go back to a normal priority to make sure it doesn't mess up with
the schedulers and timers too much.
  • Loading branch information...
1 parent 1d745fe commit 7d4a202e21435a2f331f9d3db098b3d24bfc0e06 @ferd ferd committed Nov 17, 2011
Showing with 38 additions and 46 deletions.
  1. +1 −0 src/lhttpc_client.erl
  2. +37 −46 src/lhttpc_lb.erl
View
1 src/lhttpc_client.erl
@@ -162,6 +162,7 @@ send_request(#client_state{socket = undefined} = State) ->
SocketRequest = {socket, self(), ConnectOptions, ConnectTimeout},
case gen_server:call(Lb, SocketRequest, infinity) of
{ok, Socket} ->
+ lhttpc_sock:setopts(Socket, [{active, false}], State#client_state.ssl),
send_request(State#client_state{socket = Socket});
{error, Reason} ->
throw(Reason)
View
83 src/lhttpc_lb.erl
@@ -20,8 +20,8 @@
ssl = false :: true | false,
max_connections = 10 :: non_neg_integer(),
connection_timeout = 300000 :: non_neg_integer(),
- sockets = dict:new(),
- available_sockets = queue:new()
+ sockets,
+ available_sockets = []
}).
%% @spec (any()) -> {ok, pid()}
@@ -35,13 +35,13 @@ start_link([Dest, Opts]) ->
%% @hidden
-spec init(any()) -> {ok, #httpc_man{}}.
init([{Host, Port, Ssl}, {MaxConnections, ConnectionTimeout}]) ->
- process_flag(priority, high),
State = #httpc_man{
host = Host,
port = Port,
ssl = Ssl,
max_connections = MaxConnections,
- connection_timeout = ConnectionTimeout
+ connection_timeout = ConnectionTimeout,
+ sockets = ets:new(sockets, [set])
},
{ok, State}.
@@ -101,33 +101,25 @@ find_socket(Pid, ConnectOptions, ConnectTimeout, State) ->
Host = State#httpc_man.host,
Port = State#httpc_man.port,
Ssl = State#httpc_man.ssl,
- Q1 = State#httpc_man.available_sockets,
- case queue:out(Q1) of
- {{value, Socket}, Q2} ->
- lhttpc_sock:setopts(Socket, [{active, false}], Ssl),
+ case State#httpc_man.available_sockets of
+ [Socket|Available] ->
case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
ok ->
- Timer = dict:fetch(Socket, State#httpc_man.sockets),
+ [{Socket,Timer}] = ets:lookup(State#httpc_man.sockets, Socket),
cancel_timer(Timer, Socket),
- NewState = State#httpc_man{
- available_sockets = Q2
- },
+ NewState = State#httpc_man{available_sockets = Available},
{{ok, Socket}, NewState};
{error, badarg} ->
lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
- NewState = State#httpc_man{
- available_sockets = queue:in(Socket, Q2)
- },
- {{error, no_pid}, NewState};
+ {{error, no_pid}, State};
{error, _Reason} ->
- NewState = State#httpc_man{
- available_sockets = Q2
- },
+ NewState = State#httpc_man{available_sockets = Available},
find_socket(Pid, ConnectOptions, ConnectTimeout, remove_socket(Socket, NewState))
end;
- {empty, _Q2} ->
+ [] ->
MaxSockets = State#httpc_man.max_connections,
- case MaxSockets > dict:size(State#httpc_man.sockets) of
+ Size = ets:info(State#httpc_man.sockets, size),
+ case MaxSockets > Size andalso Size =/= undefined of
true ->
SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
case lhttpc_sock:connect(Host, Port, SocketOptions, ConnectTimeout, Ssl) of
@@ -140,42 +132,41 @@ find_socket(Pid, ConnectOptions, ConnectTimeout, State) ->
{error, Reason} ->
{{error, Reason}, State}
end;
- false ->
- {{error, retry_later}, State}
- end
+ false ->
+ {{error, retry_later}, State}
+ end
end.
remove_socket(Socket, State) ->
- case dict:find(Socket, State#httpc_man.sockets) of
- {ok, Timer} ->
+ case ets:lookup(State#httpc_man.sockets, Socket) of
+ [{Socket,Timer}] ->
cancel_timer(Timer, Socket),
lhttpc_sock:close(Socket, State#httpc_man.ssl),
- State#httpc_man{
- sockets = dict:erase(Socket, State#httpc_man.sockets)
- };
- error ->
- State
- end.
+ ets:delete(State#httpc_man.sockets, Socket);
+ [] ->
+ ok
+ end,
+ State.
store_socket(Socket, State) ->
Timeout = State#httpc_man.connection_timeout,
Timer = case Timeout of
- infinity ->
- undefined;
- _Other ->
- erlang:send_after(Timeout, self(), {timeout, Socket})
- end,
+ infinity -> undefined;
+ _Other -> erlang:send_after(Timeout, self(), {timeout, Socket})
+ end,
lhttpc_sock:setopts(Socket, [{active, once}], State#httpc_man.ssl),
- State#httpc_man{
- available_sockets = queue:in(Socket, State#httpc_man.available_sockets),
- sockets = dict:store(Socket, Timer, State#httpc_man.sockets)
- }.
+ ets:insert(State#httpc_man.sockets, {Socket, Timer}),
+ State#httpc_man{available_sockets = [Socket|State#httpc_man.available_sockets]}.
close_sockets(Sockets, Ssl) ->
- lists:foreach(fun({Socket, Timer}) ->
- lhttpc_sock:close(Socket, Ssl),
- erlang:cancel_timer(Timer)
- end, dict:to_list(Sockets)).
+ ets:foldl(
+ fun({Socket, undefined}, _) ->
+ lhttpc_sock:close(Socket, Ssl);
+ ({Socket, Timer}, _) ->
+ erlang:cancel_timer(Timer),
+ lhttpc_sock:close(Socket, Ssl)
+ end, ok, Sockets
+ ).
cancel_timer(undefined, _Socket) ->
ok;
@@ -187,5 +178,5 @@ cancel_timer(Timer, Socket) ->
after
0 -> ok
end;
- _ -> ok
+ _ -> ok
end.

0 comments on commit 7d4a202

Please sign in to comment.