Skip to content

Commit

Permalink
Migrated to ETS for qcache
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Rowe committed Aug 22, 2012
1 parent 1195145 commit 0eef65d
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 137 deletions.
21 changes: 8 additions & 13 deletions README.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -33,19 +33,18 @@ connection, do this:
{<<"exchange">>, [{encoding,<<"application/bson">>}]} {<<"exchange">>, [{encoding,<<"application/bson">>}]}
``` ```


Based on the spec, a connection will be made and all bus handles will be Based on the spec, a connection will be made and the bus handles will be
cached in memory by a separate server. These can be accessed in code by cached in memory. These can be accessed in code by a call to `qcache`:
a call to `qcache`:


```erlang ```erlang
qcache:get_bus(CachePid, <<"exchange">>) qcache:get_bus(Tid, <<"exchange">>)
``` ```


The id must be the same as when configuring the connection. Don't mix and The id must be the same as when configuring the connection. Don't mix and
match as this will yield unexpected results. match as this will yield unexpected results.


The `CachePid` is how your implementation can access its cache. The pid is The `Tid` is how your implementation can access its cache. This id is
provided to your module via the init callback. If no explicit publish operations provided to your module via the new() function. If no explicit publish operations
are implemented, then this argument can be safely ignored. are implemented, then this argument can be safely ignored.


The `qcache` module supports other forms for more granular used. The `qcache` module supports other forms for more granular used.
Expand Down Expand Up @@ -285,14 +284,10 @@ routing key. Options are specified as a proplist. Current options include


The encoding property is only available for pub channels. The encoding property is only available for pub channels.


## Performance
The qcache model is slow due to the gen_server call. This will be moved to an
ETS table to manage all connections.

### Processes ### Processes
A gen_qserver and gen_qfsm both utilize a qcache, so each server without any A gen_qserver and gen_qfsm both utilize a qcache, which is now backed by ETS,
connections defined creates 2 processes. Once this is moved to ETS, it will so each server without any connections creates a single process. Rabbit
be 1 process per server. itself will create many more processes.


## Future ## Future


Expand Down
2 changes: 1 addition & 1 deletion src/gen_qfsm.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ connect(Exchange) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GEN_SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GEN_SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%


init([Module, Args, ConnSpecs]) -> init([Module, Args, ConnSpecs]) ->
{ok,Pid} = qcache:start_link(), {ok,Pid} = qcache:new(),
Handles = lists:map(fun(Conn) -> connect(Conn) end, ConnSpecs), Handles = lists:map(fun(Conn) -> connect(Conn) end, ConnSpecs),
qcache:put_conns(Pid, Handles), qcache:put_conns(Pid, Handles),
random:seed(now()), random:seed(now()),
Expand Down
14 changes: 9 additions & 5 deletions src/gen_qserver.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -85,11 +85,15 @@ connect({MaybeX, MaybeK}) ->
Tag = tag(), Tag = tag(),
bunny_farm:consume(Handle, [{consumer_tag,Tag}]), bunny_farm:consume(Handle, [{consumer_tag,Tag}]),
Exchange = case MaybeX of Exchange = case MaybeX of
{Exch,_Os} -> Exch; {Exch,_} -> Exch;
Exch -> Exch Exch -> Exch
end, end,
Key = case MaybeK of
{K,_} -> K;
K -> K
end,
%error_logger:info_msg("[gen_qserver] Returning handle spec"), %error_logger:info_msg("[gen_qserver] Returning handle spec"),
[{id,Exchange}, {tag,Tag}, {handle,Handle}]; [{id,{Exchange,Key}}, {tag,Tag}, {handle,Handle}];


%% Publish with no options %% Publish with no options
connect(<<X/binary>>) -> connect({X,[]}). connect(<<X/binary>>) -> connect({X,[]}).
Expand Down Expand Up @@ -122,7 +126,7 @@ response({stop, Reason, Reply, ModState}, #gen_qstate{}=State) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GEN_SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GEN_SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%


init([Module, Args, ConnSpecs]) -> init([Module, Args, ConnSpecs]) ->
{ok,Pid} = qcache:start_link(), {ok,Pid} = qcache:new(),
Handles = lists:map(fun(Conn) -> connect(Conn) end, ConnSpecs), Handles = lists:map(fun(Conn) -> connect(Conn) end, ConnSpecs),
qcache:put_conns(Pid, Handles), qcache:put_conns(Pid, Handles),
random:seed(now()), random:seed(now()),
Expand All @@ -141,7 +145,7 @@ init([Module, Args, ConnSpecs]) ->
%% Add an override for the encoding. All received messages will use this instead %% Add an override for the encoding. All received messages will use this instead
%% of what the message content-type specifies. %% of what the message content-type specifies.
init([Module, Args, ConnSpecs, Encoding]) -> init([Module, Args, ConnSpecs, Encoding]) ->
{ok,Pid} = qcache:start_link(), {ok,Pid} = qcache:new(),
Handles = lists:map(fun(Conn) -> connect(Conn) end, ConnSpecs), Handles = lists:map(fun(Conn) -> connect(Conn) end, ConnSpecs),
qcache:put_conns(Pid, Handles), qcache:put_conns(Pid, Handles),
random:seed(now()), random:seed(now()),
Expand Down Expand Up @@ -229,7 +233,7 @@ terminate(Reason, #gen_qstate{cache_pid=CachePid}=State) ->
bunny_farm:close(?PV(handle,PList), ?PV(tag,PList)) bunny_farm:close(?PV(handle,PList), ?PV(tag,PList))
end, end,
lists:map(Fn, Handles), lists:map(Fn, Handles),
gen_server:cast(CachePid,stop), qcache:delete(CachePid),
ok. ok.


code_change(_OldVersion, State, _Extra) -> code_change(_OldVersion, State, _Extra) ->
Expand Down
173 changes: 57 additions & 116 deletions src/qcache.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -13,150 +13,91 @@
%% limitations under the License. %% limitations under the License.


-module(qcache). -module(qcache).
-behaviour(gen_server).
-include("bunny_farm.hrl"). -include("bunny_farm.hrl").
-include("private_macros.hrl"). -include("private_macros.hrl").
-compile([{parse_transform,lager_transform}]). -compile([{parse_transform,lager_transform}]).
-export([start_link/0, init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2, code_change/3]).
-export([get_bus/2, get_conn/2, -export([get_bus/2, get_conn/2,
put_conn/2, put_conns/2, put_conn/2, put_conns/2,
activate/2, activate/2,
connections/1 ]). connections/1,
new/0, delete/1]).


-record(state, {handles=[]}).
-record(qconn, {id, tag, active=false, handle}). -record(qconn, {id, tag, active=false, handle}).


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PUBLIC %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PUBLIC %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%


start_link() -> new() ->
gen_server:start_link(?MODULE, [], []). Tid = ets:new(qcache_ets, [set, private, {keypos,2}]),
{ok,Tid}.


get_bus(ServerRef, Tuple) when is_tuple(Tuple) -> get_bus(Tid, #qconn{id=Id}) ->
gen_server:call(ServerRef, {get_bus, Tuple}); get_bus(Tid, Id);


get_bus(ServerRef, Exchange) -> get_bus(Tid, PropList) when is_list(PropList) ->
gen_server:call(ServerRef, {get_bus, {id,Exchange}}). get_bus(Tid, to_qconn(PropList));


get_conn(ServerRef, Tuple) when is_tuple(Tuple) -> get_bus(Tid, Id) ->
gen_server:call(ServerRef, {get_conn, Tuple}); case ets:lookup(Tid, Id) of

[] -> not_found;
get_conn(ServerRef, Exchange) -> [#qconn{handle=Handle}] -> Handle
gen_server:call(ServerRef, {get_conn, {id,Exchange}}).

put_conn(ServerRef, PropList) ->
gen_server:cast(ServerRef, {put_conn, PropList}).

put_conns(ServerRef, Conns) when is_list(Conns) ->
gen_server:cast(ServerRef, {put_conns, Conns}).

activate(ServerRef, Tuple) when is_tuple(Tuple) ->
gen_server:cast(ServerRef, {activate, Tuple}).

connections(ServerRef) ->
gen_server:call(ServerRef, connections).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PRIVATE %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
to_proplist(#qconn{}=QConn) ->
[_Name|Vs] = tuple_to_list(QConn),
Ks = record_info(fields,qconn),
lists:zip(Ks,Vs).

to_qconn(PropList) when is_list(PropList) ->
list_to_tuple([qconn|[?PV(X,PropList) || X <- record_info(fields,qconn)]]).


bus({tag,Tag}, State) ->
case lists:keyfind(Tag,3,State#state.handles) of
false -> not_found;
#qconn{}=QConn -> QConn#qconn.handle
end;

bus({id,Exchange}, State) ->
case lists:keyfind(Exchange,2,State#state.handles) of
false -> not_found;
#qconn{}=QConn -> QConn#qconn.handle
end. end.


get_conn(Tid, {id,Id}) -> get_conn(Tid,Id);


conn({tag,Tag}, State) -> get_conn(Tid, {tag,Tag}) ->
case lists:keyfind(Tag,3,State#state.handles) of case ets:match(Tid, {'_','_',Tag,'_','_'}) of
false -> not_found; [] -> not_found;
#qconn{}=QConn -> to_proplist(QConn) [[#qconn{}=Conn]] -> to_proplist(Conn)
end; end;


conn({id,Exchange}, State) -> get_conn(Tid, #qconn{id=Id}) ->
case lists:keyfind(Exchange,2,State#state.handles) of get_conn(Tid, Id);
false -> not_found;
#qconn{}=QConn -> to_proplist(QConn)
end.



%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GEN_SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

init(_) ->
%error_logger:info_msg("[qcache] Starting up~n"),
{ok, #state{}}.


handle_call(connections, _From, State) -> get_conn(Tid, PropList) when is_list(PropList) ->
Handles = lists:map(fun(X) -> to_proplist(X) end, State#state.handles), get_conn(Tid, to_qconn(PropList));
{reply, Handles, State};

handle_call({get_bus,Tuple}, _From, State) when is_tuple(Tuple) ->
BusHandle = bus(Tuple, State),
{reply, BusHandle, State};

handle_call({get_conn,Tuple}, _From, State) when is_tuple(Tuple) ->
Conn = conn(Tuple, State),
{reply, Conn, State}.


get_conn(Tid, Id) ->
case ets:lookup(Tid, Id) of
[] -> not_found;
[#qconn{}=Conn] -> to_proplist(Conn)
end.


handle_cast(stop, State) -> {stop,normal,State}; put_conn(Tid, #qconn{}=Conn) ->
ets:insert(Tid, Conn);


%% This replaces existing handles with the same exchange name. put_conn(Tid, PropList) ->
handle_cast({put_conn,PropList}, State) when is_list(PropList) -> put_conn(Tid, to_qconn(PropList)).
QConn = to_qconn(PropList),
Handles = lists:keystore(QConn#qconn.id, 2, State#state.handles, QConn),
{noreply, State#state{handles=Handles}};


handle_cast({put_conns,Conns}, State) when is_list(Conns) -> put_conns(Tid, Conns) when is_list(Conns) ->
Fn = fun(X, Acc) -> [ put_conn(Tid, Conn) || Conn <- Conns ].
QConn = to_qconn(X),
lists:keystore(QConn#qconn.id, 2, Acc, QConn)
end,
Handles = lists:foldl(Fn, State#state.handles, Conns),
lager:debug("Storing handles:~n ~p", [Handles]),
{noreply, State#state{handles=Handles}};


handle_cast({activate, {tag,Tag}}, State) -> activate(Tid, {id,Id}) ->
H = State#state.handles, case ets:lookup(Tid,Id) of
Handles = case lists:keyfind(Tag,3, H) of [[]] -> ok;
%% TODO Some sort of error needs to be thrown if the Tag doesn't exist [#qconn{}=Conn] -> put_conn(Tid, Conn#qconn{active=true});
false -> H; Other -> error_logger:info_msg("[qcache] Unexpected result: '~p'~n",[Other])
#qconn{}=QConn -> end;
lists:keystore(Tag,3,H,QConn#qconn{active=true})
end,
{noreply, State#state{handles=Handles}};


handle_cast({activate, {id,Exchange}}, State) -> activate(Tid, {tag,Tag}) ->
H = State#state.handles, case ets:match(Tid, {'_','_',Tag,'_','_'}) of
Handles = case lists:keyfind(Exchange,2, H) of [[]] -> ok;
%% TODO Some sort of error needs to be thrown if the Tag doesn't exist [[#qconn{}=Conn]] -> put_conn(Tid, Conn#qconn{active=true});
false -> H; Other -> error_logger:info_msg("[qcache] Unexpected result: '~p'~n",[Other])
#qconn{}=QConn -> end.
lists:keystore(Exchange,2,H,QConn#qconn{active=true})
end,
{noreply, State#state{handles=Handles}}.


connections(Tid) ->
Flat = lists:flatten(ets:match(Tid, '$1')),
[ to_proplist(X) || X <- Flat ].


handle_info(_Info, State) -> {noreply, State}. delete(Tid) ->
ets:delete(Tid).


terminate(_Reason, _State) -> ok. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% PRIVATE %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
to_proplist(#qconn{}=QConn) ->
[_Name|Vs] = tuple_to_list(QConn),
Ks = record_info(fields,qconn),
lists:zip(Ks,Vs).


code_change(_OldVersion, State, _Extra) -> to_qconn(PropList) when is_list(PropList) ->
{ok, State}. list_to_tuple([qconn|[?PV(X,PropList) || X <- record_info(fields,qconn)]]).


2 changes: 1 addition & 1 deletion test/gen_qserver_tests.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ conn_spec_override() ->
get_connection() -> get_connection() ->
Conn = my_qserver:get_connection(), Conn = my_qserver:get_connection(),
Handle = proplists:get_value(handle,Conn), Handle = proplists:get_value(handle,Conn),
?assertEqual(<<"qserver.two">>, proplists:get_value(id,Conn)), ?assertEqual({<<"qserver.two">>,<<"key">>}, proplists:get_value(id,Conn)),
?assertEqual(bus_handle, element(1,Handle)). ?assertEqual(bus_handle, element(1,Handle)).


2 changes: 1 addition & 1 deletion test/my_qserver.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ handle_call({<<B/binary>>, Args}, From, State) ->
handle_call(Args, From, State); handle_call(Args, From, State);


handle_call(connection, From, State) -> handle_call(connection, From, State) ->
handle_call({connection, <<"qserver.two">>}, From, State); handle_call({connection, {<<"qserver.two">>,<<"key">>}}, From, State);


handle_call({connection,X}, _From, State) -> handle_call({connection,X}, _From, State) ->
Conn = qcache:get_conn(State#state.cache_pid, X), Conn = qcache:get_conn(State#state.cache_pid, X),
Expand Down

0 comments on commit 0eef65d

Please sign in to comment.