Skip to content

Commit

Permalink
API for subscribing and unsubscribing at runtime. Will send result of…
Browse files Browse the repository at this point in the history
… subscription to the controlling process.
  • Loading branch information
knutin committed Jan 18, 2012
1 parent b8e64b6 commit 20a72f9
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 7 deletions.
27 changes: 26 additions & 1 deletion src/eredis_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

-export([start_link/1, start_link/4, start_link/7, stop/1, receiver/1, sub_test/0,
controlling_process/1, controlling_process/2, controlling_process/3,
ack_message/1]).
ack_message/1, subscribe/2, unsubscribe/2, channels/1]).


%%
Expand Down Expand Up @@ -67,6 +67,12 @@ stop(Pid) ->
%% and the behaviour is to drop messages, this message is sent when
%% the queue is flushed.
%%
%% {subscribed, Channel::binary(), pid()}
%% When using eredis_sub:subscribe(pid()), this message will be
%% sent for each channel Redis aknowledges the subscription. The
%% opposite, 'unsubscribed' is sent when Redis aknowledges removal
%% of a subscription.
%%
%% {eredis_disconnected, pid()}
%% This is sent when the eredis client is disconnected from redis.
%%
Expand Down Expand Up @@ -99,6 +105,25 @@ ack_message(Client) ->
gen_server:cast(Client, {ack_message, self()}).


%% @doc: Subscribe to the given channels. Returns immediately. The
%% result will be delivered to the controlling process as any other
%% message. Delivers {subscribed, Channel::binary(), pid()}
-spec subscribe(pid(), [channel()]) -> ok.
subscribe(Client, Channels) ->
gen_server:cast(Client, {subscribe, self(), Channels}).

unsubscribe(Client, Channels) ->
gen_server:cast(Client, {unsubscribe, self(), Channels}).

%% @doc: Returns the channels the given client is currently
%% subscribing to. Note: this list is based on the channels at startup
%% and any channel added during runtime. It might not immediately
%% reflect the channels Redis thinks the client is subscribed to.
channels(Client) ->
gen_server:call(Client, get_channels).



%%
%% STUFF FOR TRYING OUT PUBSUB
%%
Expand Down
29 changes: 24 additions & 5 deletions src/eredis_sub_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ handle_call({controlling_process, Pid}, _From, State) ->
Ref = erlang:monitor(process, Pid),
{reply, ok, State#state{controlling_process={Ref, Pid}, msg_state = ready}};

handle_call(get_channels, _From, State) ->
{reply, {ok, State#state.channels}, State};


handle_call(stop, _From, State) ->
{stop, normal, ok, State};
Expand All @@ -97,7 +100,6 @@ handle_cast({ack_message, Pid},
%% socket.
handle_cast({ack_message, Pid},
#state{controlling_process={_, Pid}} = State) ->

NewState = case queue:out(State#state.msg_queue) of
{empty, _Queue} ->
State#state{msg_state = ready};
Expand All @@ -107,6 +109,18 @@ handle_cast({ack_message, Pid},
end,
{noreply, NewState};

handle_cast({subscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) ->
Command = eredis:create_multibulk(["SUBSCRIBE" | Channels]),
ok = gen_tcp:send(State#state.socket, Command),
{noreply, State#state{channels = Channels ++ State#state.channels}};

handle_cast({unsubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) ->
Command = eredis:create_multibulk(["UNSUBSCRIBE" | Channels]),
ok = gen_tcp:send(State#state.socket, Command),
NewChannels = lists:foldl(fun (C, Cs) -> lists:delete(C, Cs) end,
State#state.channels, Channels),
{noreply, State#state{channels = NewChannels}};

handle_cast({ack_message, _}, State) ->
{noreply, State};

Expand Down Expand Up @@ -206,18 +220,23 @@ handle_response(Data, #state{parser_state = ParserState} = State) ->
%% acknowledged the previous process, otherwise the message is queued
%% for later delivery.
reply({ok, [<<"message">>, Channel, Message]}, State) ->
Msg = {message, Channel, Message, self()},
queue_or_send({message, Channel, Message, self()}, State);
reply({ok, [<<"subscribe">>, Channel, _]}, State) ->
queue_or_send({subscribed, Channel, self()}, State);
reply({ok, [<<"unsubscribe">>, Channel, _]}, State) ->
queue_or_send({unsubscribed, Channel, self()}, State);
reply({ReturnCode, Value}, State) ->
throw({unexpected_response_from_redis, ReturnCode, Value, State}).

queue_or_send(Msg, State) ->
case State#state.msg_state of
need_ack ->
MsgQueue = queue:in(Msg, State#state.msg_queue),
State#state{msg_queue = MsgQueue};
ready ->
send_to_controller(Msg, State),
State#state{msg_state = need_ack}
end;
reply({ReturnCode, Value}, State) ->
throw({unexpected_response_from_redis, ReturnCode, Value, State}).
end.


%% @doc: Helper for connecting to Redis. These commands are
Expand Down
35 changes: 34 additions & 1 deletion test/eredis_sub_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ drop_queue_test() ->

crash_queue_test() ->
Pub = c(),
process_flag(trap_exit, true),
{ok, Sub} = eredis_sub:start_link("127.0.0.1", 6379, "", [<<"foo">>], 100,
10, exit),

true = unlink(Sub),
ok = eredis_sub:controlling_process(Sub),
Ref = erlang:monitor(process, Sub),
Expand All @@ -120,6 +120,39 @@ crash_queue_test() ->
receive M2 -> ?assertEqual({'DOWN', Ref, process, Sub, max_queue_size}, M2) end.



dynamic_channels_test() ->
Pub = c(),
Sub = s([<<"foo">>]),
ok = eredis_sub:controlling_process(Sub),

eredis:q(Pub, [publish, newchan, foo]),

receive {message, <<"foo">>, _, _} -> ?assert(false)
after 5 -> ok end,

eredis_sub:subscribe(Sub, [<<"newchan">>, <<"otherchan">>]),
receive M1 -> ?assertEqual({subscribed, <<"newchan">>, Sub}, M1) end,
eredis_sub:ack_message(Sub),
receive M2 -> ?assertEqual({subscribed, <<"otherchan">>, Sub}, M2) end,
eredis_sub:ack_message(Sub),
?assertEqual({ok, [<<"newchan">>, <<"otherchan">>, <<"foo">>]},
eredis_sub:channels(Sub)),

eredis:q(Pub, [publish, newchan, foo]),
?assertEqual([{message, <<"newchan">>, <<"foo">>, Sub}], recv_all(Sub)),
eredis:q(Pub, [publish, otherchan, foo]),
?assertEqual([{message, <<"otherchan">>, <<"foo">>, Sub}], recv_all(Sub)),

eredis_sub:unsubscribe(Sub, [<<"otherchan">>, <<"foo">>]),
eredis_sub:ack_message(Sub),
receive M3 -> ?assertEqual({unsubscribed, <<"otherchan">>, Sub}, M3) end,
eredis_sub:ack_message(Sub),
receive M4 -> ?assertEqual({unsubscribed, <<"foo">>, Sub}, M4) end,

?assertEqual({ok, [<<"newchan">>]}, eredis_sub:channels(Sub)).


recv_all(Sub) ->
recv_all(Sub, []).

Expand Down

0 comments on commit 20a72f9

Please sign in to comment.