From 6dd05b9c5cb249bae86a028eb2f25e4f3d7641f0 Mon Sep 17 00:00:00 2001 From: Dave Peticolas Date: Sun, 30 Oct 2011 22:34:12 -0700 Subject: [PATCH] Add support for pubsub. --- .gitignore | 1 + README.md | 4 +- src/eredis.erl | 52 ++++++++++++++++- src/eredis_client.erl | 129 +++++++++++++++++++++++++++++++----------- src/eredis_priv.hrl | 30 ++++++++++ test/eredis_tests.erl | 104 ++++++++++++++++++++++++++++++++++ 6 files changed, 283 insertions(+), 37 deletions(-) create mode 100644 src/eredis_priv.hrl diff --git a/.gitignore b/.gitignore index 3c97588d..a6127fb5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +tests ebin/* .eunit diff --git a/README.md b/README.md index 86d77332..099e088d 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,8 @@ Non-blocking Redis client with a focus on performance and robustness. -It supports authentication, choosing a specific database, transactions -and pipelining. +It supports authentication, choosing a specific database, transactions, +pipelining, and pubsub. ## Example diff --git a/src/eredis.erl b/src/eredis.erl index c72678b1..0649837a 100644 --- a/src/eredis.erl +++ b/src/eredis.erl @@ -16,7 +16,9 @@ -define(TIMEOUT, 5000). -export([start_link/0, start_link/1, start_link/2, start_link/3, start_link/4, - start_link/5, q/2, q/3, qp/2, qp/3]). + start_link/5, q/2, q/3, qp/2, qp/3, + controlling_process/1, controlling_process/2, controlling_process/3, + ack_message/1]). %% Exported for testing -export([create_multibulk/1]). @@ -84,6 +86,54 @@ qp(Client, Pipeline, Timeout) -> pipeline(Client, Pipeline, Timeout). +-spec controlling_process(Client::pid()) -> ok. +%% @doc: Make the calling process the controlling process. The +%% controlling process received pubsub-related messages, of which +%% there are three kinds. In each message, the pid refers to the +%% eredis client process. +%% +%% {message, Channel::binary(), Message::binary(), pid()} +%% This is sent for each pubsub message received by the client. +%% +%% {eredis_disconnected, pid()} +%% This is sent when the eredis client is disconnected from redis. +%% +%% {eredis_connected, pid()} +%% This is sent when the eredis client reconnects to redis after +%% an existing connection was disconnected. +%% +%% Note that you must still issue SUBSCRIBE or PSUBSCRIBE redis +%% commands to receive pubsub messages. Also, once you issue a +%% SUBSCRIBE or PSUBSCRIBE command, that eredis client may only be +%% used to add or remove pubsub subscriptions and to receive pubsub +%% messages. That is how Redis pubsub works, it is not an artifact +%% of eredis. +%% +%% Any message of the form {message, _, _, _} must be acknowledged +%% before any subsequent message of the same form is sent. This +%% prevents the controlling process from being overrun with redis +%% pubsub messages. See ack_message/2 below. +controlling_process(Client) -> + controlling_process(Client, self()). + +-spec controlling_process(Client::pid(), Pid::pid()) -> ok. +%% @doc: Make the given process (pid) the controlling process. +controlling_process(Client, Pid) -> + controlling_process(Client, Pid, ?TIMEOUT). + +%% @doc: Make the given process (pid) the controlling process subscriber +%% with the given Timeout. +controlling_process(Client, Pid, Timeout) -> + gen_server:call(Client, {controlling_process, Pid}, Timeout). + + +-spec ack_message(Client::pid()) -> ok. +%% @doc: acknowledge the receipt of a pubsub message. each pubsub +%% message must be acknowledged before the next one is received +ack_message(Client) -> + gen_server:cast(Client, {ack_message, self()}). + + %% %% INTERNAL HELPERS %% diff --git a/src/eredis_client.erl b/src/eredis_client.erl index 80882cb6..d02774a3 100644 --- a/src/eredis_client.erl +++ b/src/eredis_client.erl @@ -7,7 +7,7 @@ %% %% The client works like this: %% * When starting up, we connect to Redis with the given connection -%% information, or fail. +%% information, or fail. %% * Users calls us using gen_server:call, we send the request to Redis, %% add the calling process at the end of the queue and reply with %% noreply. We are then free to handle new requests and may reply to @@ -19,13 +19,15 @@ %% * For pipeline commands, we include the number of responses we are %% waiting for in each element of the queue. Responses are queued until %% we have all the responses we need and then reply with all of them. +%% * Redis pubsub messages are handled by sending Erlang messages to +%% all registered subscribers. %% -module(eredis_client). -author('knut.nesheim@wooga.com'). -behaviour(gen_server). --include("eredis.hrl"). +-include("eredis_priv.hrl"). %% API -export([start_link/5, stop/1, select_database/2]). @@ -34,18 +36,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, { - host :: string() | undefined, - port :: integer() | undefined, - password :: binary() | undefined, - database :: binary() | undefined, - reconnect_sleep :: integer() | undefined, - - socket :: port() | undefined, - parser_state :: #pstate{} | undefined, - queue :: queue() | undefined -}). - -define(SOCKET_OPTS, [binary, {active, once}, {packet, raw}, {reuseaddr, true}]). %% @@ -77,7 +67,8 @@ init([Host, Port, Database, Password, ReconnectSleep]) -> reconnect_sleep = ReconnectSleep, parser_state = eredis_parser:init(), - queue = queue:new()}, + queue = queue:new(), + msg_queue = queue:new()}, case connect(State) of {ok, NewState} -> @@ -92,6 +83,9 @@ handle_call({request, Req}, From, State) -> handle_call({pipeline, Pipeline}, From, State) -> do_pipeline(Pipeline, From, State); +handle_call({controlling_process, Pid}, _From, State) -> + do_controlling_process(Pid, State); + handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -99,13 +93,19 @@ handle_call(_Request, _From, State) -> {reply, unknown_request, State}. +handle_cast({ack_message, Pid}, + #state{controlling_process={_, Pid}} = State) -> + {noreply, maybe_send_message(State#state{msg_state=ready})}; +handle_cast({ack_message, _}, State) -> + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. + %% Receive data from socket, see handle_response/2 handle_info({tcp, _Socket, Bs}, State) -> - inet:setopts(State#state.socket, [{active, once}]), - {noreply, handle_response(Bs, State)}; + NewState = State#state{conn_state=passive}, + {noreply, handle_response(Bs, update_socket_state(NewState))}; %% Socket got closed, for example by Redis terminating idle %% clients. Spawn of a new process which will try to reconnect and @@ -113,6 +113,7 @@ handle_info({tcp, _Socket, Bs}, State) -> %% an error message to all our clients. handle_info({tcp_closed, _Socket}, State) -> Self = self(), + send_to_controller({eredis_disconnected, Self}, State), spawn(fun() -> reconnect_loop(Self, State) end), %% Throw away the socket and the queue, as we will never get a @@ -123,7 +124,15 @@ handle_info({tcp_closed, _Socket}, State) -> %% Redis is ready to accept requests, the given Socket is a socket %% already connected and authenticated. handle_info({connection_ready, Socket}, #state{socket = undefined} = State) -> - {noreply, State#state{socket = Socket}}; + send_to_controller({eredis_connected, self()}, State), + {noreply, State#state{socket = Socket, conn_state = active_once}}; + +%% Our controlling process is down. +handle_info({'DOWN', Ref, process, Pid, _Reason}, + #state{controlling_process={Ref, Pid}} = State) -> + {stop, shutdown, State#state{controlling_process=undefined, + msg_state=ready, + msg_queue=queue:new()}}; %% eredis can be used in Poolboy, but it requires to support a simple API %% that Poolboy uses to manage the connections. @@ -158,7 +167,7 @@ do_request(Req, From, State) -> case gen_tcp:send(State#state.socket, Req) of ok -> NewQueue = queue:in({1, From}, State#state.queue), - {noreply, State#state{queue = NewQueue}}; + {noreply, update_socket_state(State#state{queue = NewQueue})}; {error, Reason} -> {reply, {error, Reason}, State} end. @@ -174,31 +183,38 @@ do_pipeline(Pipeline, From, State) -> case gen_tcp:send(State#state.socket, Pipeline) of ok -> NewQueue = queue:in({length(Pipeline), From, []}, State#state.queue), - {noreply, State#state{queue = NewQueue}}; + {noreply, update_socket_state(State#state{queue = NewQueue})}; {error, Reason} -> {reply, {error, Reason}, State} end. +-spec do_controlling_process(Pid::pid(), #state{}) -> {reply, Reply::{ok, reference()}, #state{}}. +do_controlling_process(Pid, State) -> + case State#state.controlling_process of + undefined -> + ok; + {OldRef, _OldPid} -> + erlang:demonitor(OldRef) + end, + Ref = erlang:monitor(process, Pid), + {reply, ok, State#state{controlling_process={Ref, Pid}}}. + -spec handle_response(Data::binary(), State::#state{}) -> NewState::#state{}. %% @doc: Handle the response coming from Redis. This includes parsing %% and replying to the correct client, handling partial responses, %% handling too much data and handling continuations. -handle_response(Data, #state{parser_state = ParserState, - queue = Queue} = State) -> - +handle_response(Data, #state{parser_state = ParserState} = State) -> case eredis_parser:parse(ParserState, Data) of %% Got complete response, return value to client {ReturnCode, Value, NewParserState} -> - NewQueue = reply({ReturnCode, Value}, Queue), - State#state{parser_state = NewParserState, - queue = NewQueue}; + reply({ReturnCode, Value}, State#state{parser_state=NewParserState}); %% Got complete response, with extra data, reply to client and %% recurse over the extra data {ReturnCode, Value, Rest, NewParserState} -> - NewQueue = reply({ReturnCode, Value}, Queue), - handle_response(Rest, State#state{parser_state = NewParserState, - queue = NewQueue}); + NewState = reply({ReturnCode, Value}, + State#state{parser_state=NewParserState}), + handle_response(Rest, NewState); %% Parser needs more data, the parser state now contains the %% continuation data and we will try calling parse again when @@ -210,17 +226,24 @@ handle_response(Data, #state{parser_state = ParserState, %% @doc: Sends a value to the first client in queue. Returns the new %% queue without this client. If we are still waiting for parts of a %% pipelined request, push the reply to the the head of the queue and -%% wait for another reply from redis. -reply(Value, Queue) -> +%% wait for another reply from redis. Pubsub messages are not part of +%% the normal reply stream and will instead be sent as Erlang messages +%% to the controlling process (if any). +reply({ok, [<<"message">>, Channel, Message]}, State) -> + Msg = {message, Channel, Message, self()}, + MsgQueue = queue:in(Msg, State#state.msg_queue), + maybe_send_message(State#state{msg_queue=MsgQueue}); +reply(Value, #state{queue=Queue} = State) -> case queue:out(Queue) of {{value, {1, From}}, NewQueue} -> gen_server:reply(From, Value), - NewQueue; + State#state{queue=NewQueue}; {{value, {1, From, Replies}}, NewQueue} -> gen_server:reply(From, lists:reverse([Value | Replies])), - NewQueue; + State#state{queue=NewQueue}; {{value, {N, From, Replies}}, NewQueue} when N > 1 -> - queue:in_r({N - 1, From, [Value | Replies]}, NewQueue); + State#state{queue=queue:in_r({N - 1, From, [Value | Replies]}, + NewQueue)}; {empty, Queue} -> %% Oops error_logger:info_msg("Nothing in queue, but got value from parser~n"), @@ -294,3 +317,41 @@ reconnect_loop(Client, #state{reconnect_sleep=ReconnectSleep}=State) -> timer:sleep(ReconnectSleep), reconnect_loop(Client, State) end. + + +send_to_controller(_Msg, #state{controlling_process=undefined}) -> + ok; +send_to_controller(Msg, #state{controlling_process={_Ref, Pid}}) -> + Pid ! Msg. + + +maybe_send_message(#state{controlling_process=undefined} = State) -> + State#state{msg_queue=queue:new()}; +maybe_send_message(#state{msg_state=need_ack} = State) -> + State; +maybe_send_message(State) -> + case queue:out(State#state.msg_queue) of + {empty, _Queue} -> + State; + {{value, Msg}, Queue} -> + send_to_controller(Msg, State), + State#state{msg_queue=Queue, msg_state=need_ack} + end. + + +update_socket_state(#state{conn_state=active_once} = State) -> + State; +update_socket_state(#state{controlling_process=undefined} = State) -> + inet:setopts(State#state.socket, [{active, once}]), + State#state{conn_state=active_once}; +update_socket_state(#state{msg_state=ready} = State) -> + inet:setopts(State#state.socket, [{active, once}]), + State#state{conn_state=active_once}; +update_socket_state(State) -> + case queue:is_empty(State#state.queue) of + true -> + State; + false -> + inet:setopts(State#state.socket, [{active, once}]), + State#state{conn_state=active_once} + end. diff --git a/src/eredis_priv.hrl b/src/eredis_priv.hrl new file mode 100644 index 00000000..1063eb90 --- /dev/null +++ b/src/eredis_priv.hrl @@ -0,0 +1,30 @@ +-include("eredis.hrl"). + +-record(state, { + host :: string() | undefined, + port :: integer() | undefined, + password :: binary() | undefined, + database :: binary() | undefined, + reconnect_sleep :: integer() | undefined, + + socket :: port() | undefined, + parser_state :: #pstate{} | undefined, + queue :: queue() | undefined, + + % The process we send pubsub and connection state messages to. + controlling_process :: undefined | {reference(), pid()}, + + % This is the queue of messages to send to the controlling + % process. + msg_queue :: queue(), + + % The msg_state keeps track of whether we are waiting + % for the controlling process to acknowledge the last + % message. + msg_state = ready :: ready | need_ack, + + % The conn_state keeps track of whether we have set the + % tcp connection to {active, once} after receiving some + % tcp data. + conn_state = passive :: passive | active_once +}). diff --git a/test/eredis_tests.erl b/test/eredis_tests.erl index 62bfd192..7ca02d22 100644 --- a/test/eredis_tests.erl +++ b/test/eredis_tests.erl @@ -1,6 +1,7 @@ -module(eredis_tests). -include_lib("eunit/include/eunit.hrl"). +-include("../src/eredis_priv.hrl"). -import(eredis, [create_multibulk/1]). @@ -121,3 +122,106 @@ multibulk_test_() -> ?_assertThrow({cannot_store_floats, 123.5}, list_to_binary(create_multibulk(['SET', foo, 123.5]))) ]. + + + +pubsub_test() -> + Pub = c(), + Sub = c(), + ok = eredis:controlling_process(Sub), + Res1 = eredis:q(Sub, ["SUBSCRIBE", chan]), + ?assertEqual({ok, [<<"subscribe">>, <<"chan">>, <<"1">>]}, Res1), + Res2 = eredis:q(Pub, ["PUBLISH", chan, msg]), + ?assertEqual({ok, <<"1">>}, Res2), + Msg = receive + {message, _, _, _} = InMsg -> + InMsg + end, + ?assertEqual({message, <<"chan">>, <<"msg">>, Sub}, Msg). + + +pubsub_manage_subscribers_test() -> + Pub = c(), + Sub = c(), + unlink(Sub), + eredis:q(Sub, ["SUBSCRIBE", chan]), + #state{controlling_process=undefined} = get_state(Sub), + S1 = subscriber(Sub), + ok = eredis:controlling_process(Sub, S1), + #state{controlling_process={_, S1}} = get_state(Sub), + S2 = subscriber(Sub), + ok = eredis:controlling_process(Sub, S2), + #state{controlling_process={_, S2}} = get_state(Sub), + eredis:q(Pub, ["PUBLISH", chan, msg1]), + S1 ! stop, + ok = wait_for_stop(S1), + eredis:q(Pub, ["PUBLISH", chan, msg2]), + M2 = wait_for_msg(S2), + ?assertEqual(M2, {message, <<"chan">>, <<"msg1">>, Sub}), + M3 = wait_for_msg(S2), + ?assertEqual(M3, {message, <<"chan">>, <<"msg2">>, Sub}), + S2 ! stop, + ok = wait_for_stop(S2), + Ref = erlang:monitor(process, Sub), + receive {'DOWN', Ref, process, Sub, _} -> ok end. + + +pubsub_connect_disconnect_messages_test() -> + Pub = c(), + Sub = c(), + eredis:q(Sub, ["SUBSCRIBE", chan]), + S = subscriber(Sub), + ok = eredis:controlling_process(Sub, S), + eredis:q(Pub, ["PUBLISH", chan, msg]), + wait_for_msg(S), + #state{socket=Sock} = get_state(Sub), + gen_tcp:close(Sock), + Sub ! {tcp_closed, Sock}, + M1 = wait_for_msg(S), + ?assertEqual({eredis_disconnected, Sub}, M1), + M2 = wait_for_msg(S), + ?assertEqual({eredis_connected, Sub}, M2). + + +subscriber(Client) -> + Test = self(), + Pid = spawn(fun () -> subscriber(Client, Test) end), + spawn(fun() -> + Ref = erlang:monitor(process, Pid), + receive + {'DOWN', Ref, _, _, _} -> + Test ! {stopped, Pid} + end + end), + Pid. + +subscriber(Client, Test) -> + receive + stop -> + ok; + Msg -> + Test ! {got_message, self(), Msg}, + eredis:ack_message(Client), + subscriber(Client, Test) + end. + +wait_for_msg(Subscriber) -> + receive + {got_message, Subscriber, Msg} -> + Msg + end. + +wait_for_stop(Subscriber) -> + receive + {stopped, Subscriber} -> + ok + end. + +get_state(Pid) + when is_pid(Pid) -> + {status, _, _, [_, _, _, _, State]} = sys:get_status(Pid), + get_state(State); +get_state([{data, [{"State", State}]} | _]) -> + State; +get_state([_|Rest]) -> + get_state(Rest).