Skip to content

Commit

Permalink
When the connection to Redis is lost, try to reconnect while short-ci…
Browse files Browse the repository at this point in the history
…rcuiting the responses to clients to return errors.
  • Loading branch information
Knut Nesheim committed Apr 28, 2011
1 parent a8100f1 commit 7fe20d7
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 25 deletions.
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,21 @@ To start the client, use `eredis:start_link/0` or
* Password, string or empty string([]) for no password
* Database, integer or 0 for default database

## Reconnecting on time out
## Reconnecting on Redis down / network failure / timeout / etc

Redis will disconnect any client that is idle for more than the
configured timeout. When this happens, Eredis will automatically
reconnect. In other words, there will always be one open connection to
Redis for every client. If re-establishing the connection fails, the
client terminates.
When Eredis for some reason looses the connection to Redis, Eredis
will keep trying to reconnect until a connection is successfully
established, which includes the AUTH and SELECT calls. The sleep time
between attempts to reconnect is 100 milliseconds.

As long as the connection is down, Eredis will respond to any request
immediately with `{error, no_connection}` without actually trying to
connect. This serves as a kind of circuit breaker and prevents a
stampede of clients just waiting for a failed connection attempt or
`gen_server:call` timeout.

Note: If Eredis is starting up and cannot connect, it will fail
immediately with `{connection_error, Reason}`.

## AUTH and SELECT

Expand Down
78 changes: 59 additions & 19 deletions src/eredis_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
}).

-define(SOCKET_OPTS, [binary, {active, once}, {packet, raw}, {reuseaddr, true}]).
-define(RECONNECT_SLEEP, 100). %% Sleep between reconnect attempts, in milliseconds

%%
%% API
Expand Down Expand Up @@ -76,20 +77,15 @@ init([Host, Port, Database, Password]) ->
end.

handle_call({request, Req}, From, State) ->
case gen_tcp:send(State#state.socket, Req) of
ok ->
NewQueue = queue:in(From, State#state.queue),
{noreply, State#state{queue = NewQueue}};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
do_request(Req, From, State);

handle_call(stop, _From, State) ->
{stop, normal, State};

handle_call(_Request, _From, State) ->
{reply, unknown_request, State}.


handle_cast(_Msg, State) ->
{noreply, State}.

Expand All @@ -98,23 +94,32 @@ handle_info({tcp, _Socket, Bs}, State) ->
inet:setopts(State#state.socket, [{active, once}]),
{noreply, handle_response(Bs, State)};

%% Socket got closed, for example by Redis terminating idle clients.
%% Reconnect and if it fails, stop the gen_server.
%% Socket got closed, for example by Redis terminating idle
%% clients. Spawn of a new process which will try to reconnect and
%% notify us when Redis is ready. In the meantime, we can respond with
%% an error message to all our clients.
handle_info({tcp_closed, _Socket}, State) ->
case connect(State) of
{ok, NewState} ->
%% Throw away the queue, as we will never get a response
%% for the requests sent on the old socket, ever
{noreply, NewState#state{queue = queue:new()}};
{error, Reason} ->
{stop, {reconnect_error, Reason}}
end;
Self = self(),
spawn(fun() -> reconnect_loop(Self, State) end),

%% Throw away the socket and the queue, as we will never get a
%% response to the requests sent on the old socket. The absence of
%% a socket is used to signal we are "down"
{noreply, State#state{socket = undefined, queue = queue:new()}};

%% Redis is ready to accept requests, the given Socket is a socket
%% already connected and authenticated.
handle_info({connection_ready, Socket}, #state{socket = undefined} = State) ->
{noreply, State#state{socket = Socket}};

handle_info(_Info, State) ->
{noreply, State}.
{stop, {unhandled_message, _Info}, State}.

terminate(_Reason, State) ->
gen_tcp:close(State#state.socket),
case State#state.socket of
undefined -> ok;
Socket -> gen_tcp:close(Socket)
end,
ok.

code_change(_OldVsn, State, _Extra) ->
Expand All @@ -124,6 +129,22 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------

-spec do_request(Req::iolist(), From::pid(), #state{}) ->
{noreply, #state{}} | {reply, Reply::any(), #state{}}.
%% @doc: Sends the given request to redis. If we do not have a
%% connection, returns error.
do_request(_Req, _From, #state{socket = undefined} = State) ->
{reply, {error, no_connection}, State};

do_request(Req, From, State) ->
case gen_tcp:send(State#state.socket, Req) of
ok ->
NewQueue = queue:in(From, State#state.queue),
{noreply, State#state{queue = NewQueue}};
{error, Reason} ->
{reply, {error, Reason}, State}
end.

-spec handle_response(Data::binary(), State::#state{}) -> NewState::#state{}.
%% @doc: Handle the response coming from Redis. This includes parsing
%% and replying to the correct client, handling partial responses,
Expand Down Expand Up @@ -213,3 +234,22 @@ do_sync_command(Socket, Command) ->
{error, Reason} ->
{error, Reason}
end.

%% @doc: Loop until a connection can be established, this includes
%% successfully issuing the auth and select calls. When we have a
%% connection, give the socket to the redis client.
reconnect_loop(Client, State) ->
case catch(connect(State)) of
{ok, #state{socket = Socket}} ->
gen_tcp:controlling_process(Socket, Client),
Client ! {connection_ready, Socket};
{error, _Reason} ->
timer:sleep(?RECONNECT_SLEEP),
reconnect_loop(Client, State);
%% Something bad happened when connecting, like Redis might be
%% loading the dataset and we got something other than 'OK' in
%% auth or select
{'EXIT', _} ->
timer:sleep(?RECONNECT_SLEEP),
reconnect_loop(Client, State)
end.

0 comments on commit 7fe20d7

Please sign in to comment.