Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Fix pour ETS. #1

Merged
merged 1 commit into from

2 participants

@ferd

J'ai fait quelques tests, et ça semble bien marcher, si tu veux vérifier.

@ferd ferd Changing the lhttpc load balancer to use ETS
The current implementation uses a dict and a queue for common
socket operations when load-balancing. Over heavy load, the process
gets to be very slow. Plus, it set itself as a high priority process,
unbalancing the whole VM.

This switches the dict to an ETS table, and the queue to a stack
(list) in order to reduce operations. Moreover, the process will
go back to a normal priority to make sure it doesn't mess up with
the schedulers and timers too much.
7d4a202
@lpgauth lpgauth merged commit 11fad66 into lpgauth:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 17, 2011
  1. @ferd

    Changing the lhttpc load balancer to use ETS

    ferd authored
    The current implementation uses a dict and a queue for common
    socket operations when load-balancing. Over heavy load, the process
    gets to be very slow. Plus, it set itself as a high priority process,
    unbalancing the whole VM.
    
    This switches the dict to an ETS table, and the queue to a stack
    (list) in order to reduce operations. Moreover, the process will
    go back to a normal priority to make sure it doesn't mess up with
    the schedulers and timers too much.
This page is out of date. Refresh to see the latest.
Showing with 38 additions and 46 deletions.
  1. +1 −0  src/lhttpc_client.erl
  2. +37 −46 src/lhttpc_lb.erl
View
1  src/lhttpc_client.erl
@@ -162,6 +162,7 @@ send_request(#client_state{socket = undefined} = State) ->
SocketRequest = {socket, self(), ConnectOptions, ConnectTimeout},
case gen_server:call(Lb, SocketRequest, infinity) of
{ok, Socket} ->
+ lhttpc_sock:setopts(Socket, [{active, false}], State#client_state.ssl),
send_request(State#client_state{socket = Socket});
{error, Reason} ->
throw(Reason)
View
83 src/lhttpc_lb.erl
@@ -20,8 +20,8 @@
ssl = false :: true | false,
max_connections = 10 :: non_neg_integer(),
connection_timeout = 300000 :: non_neg_integer(),
- sockets = dict:new(),
- available_sockets = queue:new()
+ sockets,
+ available_sockets = []
}).
%% @spec (any()) -> {ok, pid()}
@@ -35,13 +35,13 @@ start_link([Dest, Opts]) ->
%% @hidden
-spec init(any()) -> {ok, #httpc_man{}}.
init([{Host, Port, Ssl}, {MaxConnections, ConnectionTimeout}]) ->
- process_flag(priority, high),
State = #httpc_man{
host = Host,
port = Port,
ssl = Ssl,
max_connections = MaxConnections,
- connection_timeout = ConnectionTimeout
+ connection_timeout = ConnectionTimeout,
+ sockets = ets:new(sockets, [set])
},
{ok, State}.
@@ -101,33 +101,25 @@ find_socket(Pid, ConnectOptions, ConnectTimeout, State) ->
Host = State#httpc_man.host,
Port = State#httpc_man.port,
Ssl = State#httpc_man.ssl,
- Q1 = State#httpc_man.available_sockets,
- case queue:out(Q1) of
- {{value, Socket}, Q2} ->
- lhttpc_sock:setopts(Socket, [{active, false}], Ssl),
+ case State#httpc_man.available_sockets of
+ [Socket|Available] ->
case lhttpc_sock:controlling_process(Socket, Pid, Ssl) of
ok ->
- Timer = dict:fetch(Socket, State#httpc_man.sockets),
+ [{Socket,Timer}] = ets:lookup(State#httpc_man.sockets, Socket),
cancel_timer(Timer, Socket),
- NewState = State#httpc_man{
- available_sockets = Q2
- },
+ NewState = State#httpc_man{available_sockets = Available},
{{ok, Socket}, NewState};
{error, badarg} ->
lhttpc_sock:setopts(Socket, [{active, once}], Ssl),
- NewState = State#httpc_man{
- available_sockets = queue:in(Socket, Q2)
- },
- {{error, no_pid}, NewState};
+ {{error, no_pid}, State};
{error, _Reason} ->
- NewState = State#httpc_man{
- available_sockets = Q2
- },
+ NewState = State#httpc_man{available_sockets = Available},
find_socket(Pid, ConnectOptions, ConnectTimeout, remove_socket(Socket, NewState))
end;
- {empty, _Q2} ->
+ [] ->
MaxSockets = State#httpc_man.max_connections,
- case MaxSockets > dict:size(State#httpc_man.sockets) of
+ Size = ets:info(State#httpc_man.sockets, size),
+ case MaxSockets > Size andalso Size =/= undefined of
true ->
SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
case lhttpc_sock:connect(Host, Port, SocketOptions, ConnectTimeout, Ssl) of
@@ -140,42 +132,41 @@ find_socket(Pid, ConnectOptions, ConnectTimeout, State) ->
{error, Reason} ->
{{error, Reason}, State}
end;
- false ->
- {{error, retry_later}, State}
- end
+ false ->
+ {{error, retry_later}, State}
+ end
end.
remove_socket(Socket, State) ->
- case dict:find(Socket, State#httpc_man.sockets) of
- {ok, Timer} ->
+ case ets:lookup(State#httpc_man.sockets, Socket) of
+ [{Socket,Timer}] ->
cancel_timer(Timer, Socket),
lhttpc_sock:close(Socket, State#httpc_man.ssl),
- State#httpc_man{
- sockets = dict:erase(Socket, State#httpc_man.sockets)
- };
- error ->
- State
- end.
+ ets:delete(State#httpc_man.sockets, Socket);
+ [] ->
+ ok
+ end,
+ State.
store_socket(Socket, State) ->
Timeout = State#httpc_man.connection_timeout,
Timer = case Timeout of
- infinity ->
- undefined;
- _Other ->
- erlang:send_after(Timeout, self(), {timeout, Socket})
- end,
+ infinity -> undefined;
+ _Other -> erlang:send_after(Timeout, self(), {timeout, Socket})
+ end,
lhttpc_sock:setopts(Socket, [{active, once}], State#httpc_man.ssl),
- State#httpc_man{
- available_sockets = queue:in(Socket, State#httpc_man.available_sockets),
- sockets = dict:store(Socket, Timer, State#httpc_man.sockets)
- }.
+ ets:insert(State#httpc_man.sockets, {Socket, Timer}),
+ State#httpc_man{available_sockets = [Socket|State#httpc_man.available_sockets]}.
close_sockets(Sockets, Ssl) ->
- lists:foreach(fun({Socket, Timer}) ->
- lhttpc_sock:close(Socket, Ssl),
- erlang:cancel_timer(Timer)
- end, dict:to_list(Sockets)).
+ ets:foldl(
+ fun({Socket, undefined}, _) ->
+ lhttpc_sock:close(Socket, Ssl);
+ ({Socket, Timer}, _) ->
+ erlang:cancel_timer(Timer),
+ lhttpc_sock:close(Socket, Ssl)
+ end, ok, Sockets
+ ).
cancel_timer(undefined, _Socket) ->
ok;
@@ -187,5 +178,5 @@ cancel_timer(Timer, Socket) ->
after
0 -> ok
end;
- _ -> ok
+ _ -> ok
end.
Something went wrong with that request. Please try again.