Skip to content

Commit

Permalink
Merge branch 'pubsub-2' into pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
Knut Nesheim committed Dec 2, 2011
2 parents 1289cda + 6dd05b9 commit f1401dc
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,2 +1,3 @@
tests
ebin/*
.eunit
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -2,8 +2,8 @@

Non-blocking Redis client with a focus on performance and robustness.

It supports authentication, choosing a specific database, transactions
and pipelining.
It supports authentication, choosing a specific database, transactions,
pipelining, and pubsub.

## Example

Expand Down
52 changes: 51 additions & 1 deletion src/eredis.erl
Expand Up @@ -16,7 +16,9 @@
-define(TIMEOUT, 5000).

-export([start_link/0, start_link/1, start_link/2, start_link/3, start_link/4,
start_link/5, q/2, q/3, qp/2, qp/3]).
start_link/5, q/2, q/3, qp/2, qp/3,
controlling_process/1, controlling_process/2, controlling_process/3,
ack_message/1]).

%% Exported for testing
-export([create_multibulk/1]).
Expand Down Expand Up @@ -84,6 +86,54 @@ qp(Client, Pipeline, Timeout) ->
pipeline(Client, Pipeline, Timeout).


-spec controlling_process(Client::pid()) -> ok.
%% @doc: Make the calling process the controlling process. The
%% controlling process received pubsub-related messages, of which
%% there are three kinds. In each message, the pid refers to the
%% eredis client process.
%%
%% {message, Channel::binary(), Message::binary(), pid()}
%% This is sent for each pubsub message received by the client.
%%
%% {eredis_disconnected, pid()}
%% This is sent when the eredis client is disconnected from redis.
%%
%% {eredis_connected, pid()}
%% This is sent when the eredis client reconnects to redis after
%% an existing connection was disconnected.
%%
%% Note that you must still issue SUBSCRIBE or PSUBSCRIBE redis
%% commands to receive pubsub messages. Also, once you issue a
%% SUBSCRIBE or PSUBSCRIBE command, that eredis client may only be
%% used to add or remove pubsub subscriptions and to receive pubsub
%% messages. That is how Redis pubsub works, it is not an artifact
%% of eredis.
%%
%% Any message of the form {message, _, _, _} must be acknowledged
%% before any subsequent message of the same form is sent. This
%% prevents the controlling process from being overrun with redis
%% pubsub messages. See ack_message/2 below.
controlling_process(Client) ->
controlling_process(Client, self()).

-spec controlling_process(Client::pid(), Pid::pid()) -> ok.
%% @doc: Make the given process (pid) the controlling process.
controlling_process(Client, Pid) ->
controlling_process(Client, Pid, ?TIMEOUT).

%% @doc: Make the given process (pid) the controlling process subscriber
%% with the given Timeout.
controlling_process(Client, Pid, Timeout) ->
gen_server:call(Client, {controlling_process, Pid}, Timeout).


-spec ack_message(Client::pid()) -> ok.
%% @doc: acknowledge the receipt of a pubsub message. each pubsub
%% message must be acknowledged before the next one is received
ack_message(Client) ->
gen_server:cast(Client, {ack_message, self()}).


%%
%% INTERNAL HELPERS
%%
Expand Down
129 changes: 95 additions & 34 deletions src/eredis_client.erl
Expand Up @@ -7,7 +7,7 @@
%%
%% The client works like this:
%% * When starting up, we connect to Redis with the given connection
%% information, or fail.
%% information, or fail.
%% * Users calls us using gen_server:call, we send the request to Redis,
%% add the calling process at the end of the queue and reply with
%% noreply. We are then free to handle new requests and may reply to
Expand All @@ -19,13 +19,15 @@
%% * For pipeline commands, we include the number of responses we are
%% waiting for in each element of the queue. Responses are queued until
%% we have all the responses we need and then reply with all of them.
%% * Redis pubsub messages are handled by sending Erlang messages to
%% all registered subscribers.
%%
-module(eredis_client).
-author('knut.nesheim@wooga.com').

-behaviour(gen_server).

-include("eredis.hrl").
-include("eredis_priv.hrl").

%% API
-export([start_link/5, stop/1, select_database/2]).
Expand All @@ -34,18 +36,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(state, {
host :: string() | undefined,
port :: integer() | undefined,
password :: binary() | undefined,
database :: binary() | undefined,
reconnect_sleep :: integer() | undefined,

socket :: port() | undefined,
parser_state :: #pstate{} | undefined,
queue :: queue() | undefined
}).

-define(SOCKET_OPTS, [binary, {active, once}, {packet, raw}, {reuseaddr, true}]).

%%
Expand Down Expand Up @@ -77,7 +67,8 @@ init([Host, Port, Database, Password, ReconnectSleep]) ->
reconnect_sleep = ReconnectSleep,

parser_state = eredis_parser:init(),
queue = queue:new()},
queue = queue:new(),
msg_queue = queue:new()},

case connect(State) of
{ok, NewState} ->
Expand All @@ -92,27 +83,37 @@ handle_call({request, Req}, From, State) ->
handle_call({pipeline, Pipeline}, From, State) ->
do_pipeline(Pipeline, From, State);

handle_call({controlling_process, Pid}, _From, State) ->
do_controlling_process(Pid, State);

handle_call(stop, _From, State) ->
{stop, normal, ok, State};

handle_call(_Request, _From, State) ->
{reply, unknown_request, State}.


handle_cast({ack_message, Pid},
#state{controlling_process={_, Pid}} = State) ->
{noreply, maybe_send_message(State#state{msg_state=ready})};
handle_cast({ack_message, _}, State) ->
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.


%% Receive data from socket, see handle_response/2
handle_info({tcp, _Socket, Bs}, State) ->
inet:setopts(State#state.socket, [{active, once}]),
{noreply, handle_response(Bs, State)};
NewState = State#state{conn_state=passive},
{noreply, handle_response(Bs, update_socket_state(NewState))};

%% Socket got closed, for example by Redis terminating idle
%% clients. Spawn of a new process which will try to reconnect and
%% notify us when Redis is ready. In the meantime, we can respond with
%% an error message to all our clients.
handle_info({tcp_closed, _Socket}, State) ->
Self = self(),
send_to_controller({eredis_disconnected, Self}, State),
spawn(fun() -> reconnect_loop(Self, State) end),

%% Throw away the socket and the queue, as we will never get a
Expand All @@ -123,7 +124,15 @@ handle_info({tcp_closed, _Socket}, State) ->
%% Redis is ready to accept requests, the given Socket is a socket
%% already connected and authenticated.
handle_info({connection_ready, Socket}, #state{socket = undefined} = State) ->
{noreply, State#state{socket = Socket}};
send_to_controller({eredis_connected, self()}, State),
{noreply, State#state{socket = Socket, conn_state = active_once}};

%% Our controlling process is down.
handle_info({'DOWN', Ref, process, Pid, _Reason},
#state{controlling_process={Ref, Pid}} = State) ->
{stop, shutdown, State#state{controlling_process=undefined,
msg_state=ready,
msg_queue=queue:new()}};

%% eredis can be used in Poolboy, but it requires to support a simple API
%% that Poolboy uses to manage the connections.
Expand Down Expand Up @@ -158,7 +167,7 @@ do_request(Req, From, State) ->
case gen_tcp:send(State#state.socket, Req) of
ok ->
NewQueue = queue:in({1, From}, State#state.queue),
{noreply, State#state{queue = NewQueue}};
{noreply, update_socket_state(State#state{queue = NewQueue})};
{error, Reason} ->
{reply, {error, Reason}, State}
end.
Expand All @@ -174,31 +183,38 @@ do_pipeline(Pipeline, From, State) ->
case gen_tcp:send(State#state.socket, Pipeline) of
ok ->
NewQueue = queue:in({length(Pipeline), From, []}, State#state.queue),
{noreply, State#state{queue = NewQueue}};
{noreply, update_socket_state(State#state{queue = NewQueue})};
{error, Reason} ->
{reply, {error, Reason}, State}
end.

-spec do_controlling_process(Pid::pid(), #state{}) -> {reply, Reply::{ok, reference()}, #state{}}.
do_controlling_process(Pid, State) ->
case State#state.controlling_process of
undefined ->
ok;
{OldRef, _OldPid} ->
erlang:demonitor(OldRef)
end,
Ref = erlang:monitor(process, Pid),
{reply, ok, State#state{controlling_process={Ref, Pid}}}.

-spec handle_response(Data::binary(), State::#state{}) -> NewState::#state{}.
%% @doc: Handle the response coming from Redis. This includes parsing
%% and replying to the correct client, handling partial responses,
%% handling too much data and handling continuations.
handle_response(Data, #state{parser_state = ParserState,
queue = Queue} = State) ->

handle_response(Data, #state{parser_state = ParserState} = State) ->
case eredis_parser:parse(ParserState, Data) of
%% Got complete response, return value to client
{ReturnCode, Value, NewParserState} ->
NewQueue = reply({ReturnCode, Value}, Queue),
State#state{parser_state = NewParserState,
queue = NewQueue};
reply({ReturnCode, Value}, State#state{parser_state=NewParserState});

%% Got complete response, with extra data, reply to client and
%% recurse over the extra data
{ReturnCode, Value, Rest, NewParserState} ->
NewQueue = reply({ReturnCode, Value}, Queue),
handle_response(Rest, State#state{parser_state = NewParserState,
queue = NewQueue});
NewState = reply({ReturnCode, Value},
State#state{parser_state=NewParserState}),
handle_response(Rest, NewState);

%% Parser needs more data, the parser state now contains the
%% continuation data and we will try calling parse again when
Expand All @@ -210,17 +226,24 @@ handle_response(Data, #state{parser_state = ParserState,
%% @doc: Sends a value to the first client in queue. Returns the new
%% queue without this client. If we are still waiting for parts of a
%% pipelined request, push the reply to the the head of the queue and
%% wait for another reply from redis.
reply(Value, Queue) ->
%% wait for another reply from redis. Pubsub messages are not part of
%% the normal reply stream and will instead be sent as Erlang messages
%% to the controlling process (if any).
reply({ok, [<<"message">>, Channel, Message]}, State) ->
Msg = {message, Channel, Message, self()},
MsgQueue = queue:in(Msg, State#state.msg_queue),
maybe_send_message(State#state{msg_queue=MsgQueue});
reply(Value, #state{queue=Queue} = State) ->
case queue:out(Queue) of
{{value, {1, From}}, NewQueue} ->
gen_server:reply(From, Value),
NewQueue;
State#state{queue=NewQueue};
{{value, {1, From, Replies}}, NewQueue} ->
gen_server:reply(From, lists:reverse([Value | Replies])),
NewQueue;
State#state{queue=NewQueue};
{{value, {N, From, Replies}}, NewQueue} when N > 1 ->
queue:in_r({N - 1, From, [Value | Replies]}, NewQueue);
State#state{queue=queue:in_r({N - 1, From, [Value | Replies]},
NewQueue)};
{empty, Queue} ->
%% Oops
error_logger:info_msg("Nothing in queue, but got value from parser~n"),
Expand Down Expand Up @@ -294,3 +317,41 @@ reconnect_loop(Client, #state{reconnect_sleep=ReconnectSleep}=State) ->
timer:sleep(ReconnectSleep),
reconnect_loop(Client, State)
end.


send_to_controller(_Msg, #state{controlling_process=undefined}) ->
ok;
send_to_controller(Msg, #state{controlling_process={_Ref, Pid}}) ->
Pid ! Msg.


maybe_send_message(#state{controlling_process=undefined} = State) ->
State#state{msg_queue=queue:new()};
maybe_send_message(#state{msg_state=need_ack} = State) ->
State;
maybe_send_message(State) ->
case queue:out(State#state.msg_queue) of
{empty, _Queue} ->
State;
{{value, Msg}, Queue} ->
send_to_controller(Msg, State),
State#state{msg_queue=Queue, msg_state=need_ack}
end.


update_socket_state(#state{conn_state=active_once} = State) ->
State;
update_socket_state(#state{controlling_process=undefined} = State) ->
inet:setopts(State#state.socket, [{active, once}]),
State#state{conn_state=active_once};
update_socket_state(#state{msg_state=ready} = State) ->
inet:setopts(State#state.socket, [{active, once}]),
State#state{conn_state=active_once};
update_socket_state(State) ->
case queue:is_empty(State#state.queue) of
true ->
State;
false ->
inet:setopts(State#state.socket, [{active, once}]),
State#state{conn_state=active_once}
end.
30 changes: 30 additions & 0 deletions src/eredis_priv.hrl
@@ -0,0 +1,30 @@
-include("eredis.hrl").

-record(state, {
host :: string() | undefined,
port :: integer() | undefined,
password :: binary() | undefined,
database :: binary() | undefined,
reconnect_sleep :: integer() | undefined,

socket :: port() | undefined,
parser_state :: #pstate{} | undefined,
queue :: queue() | undefined,

% The process we send pubsub and connection state messages to.
controlling_process :: undefined | {reference(), pid()},

% This is the queue of messages to send to the controlling
% process.
msg_queue :: queue(),

% The msg_state keeps track of whether we are waiting
% for the controlling process to acknowledge the last
% message.
msg_state = ready :: ready | need_ack,

% The conn_state keeps track of whether we have set the
% tcp connection to {active, once} after receiving some
% tcp data.
conn_state = passive :: passive | active_once
}).

0 comments on commit f1401dc

Please sign in to comment.