Permalink
Browse files

Handle concurrency with pipeline patch.

  • Loading branch information...
1 parent bfc9f1d commit 52addf96719115c6b56c6efd695878cbf8b5155f Knut Nesheim committed Oct 12, 2011
Showing with 22 additions and 4 deletions.
  1. +3 −2 src/eredis.erl
  2. +4 −2 src/eredis_client.erl
  3. +15 −0 test/eredis_tests.erl
View
@@ -58,7 +58,7 @@ start_link(Args) ->
-spec q(Client::pid(), Command::iolist()) ->
- {ok, return_value()} | {error, Reason::binary()}.
+ {ok, return_value()} | {error, Reason::binary() | no_connection}.
%% @doc: Executes the given command in the specified connection. The
%% command must be a valid Redis command and may contain arbitrary
%% data which will be converted to binaries. The returned values will
@@ -71,7 +71,8 @@ q(Client, Command, Timeout) ->
-spec qp(Client::pid(), Pipeline::pipeline()) ->
- {ok, return_value()} | {error, Reason::binary()}.
+ [{ok, return_value()} | {error, Reason::binary()}] |
+ {error, no_connection}.
%% @doc: Executes the given pipeline (list of commands) in the
%% specified connection. The commands must be valid Redis commands and
%% may contain arbitrary data which will be converted to binaries. The
View
@@ -208,7 +208,9 @@ handle_response(Data, #state{parser_state = ParserState,
end.
%% @doc: Sends a value to the first client in queue. Returns the new
-%% queue without this client.
+%% queue without this client. If we are still waiting for parts of a
+%% pipelined request, push the reply to the the head of the queue and
+%% wait for another reply from redis.
reply(Value, Queue) ->
case queue:out(Queue) of
{{value, {1, From}}, NewQueue} ->
@@ -218,7 +220,7 @@ reply(Value, Queue) ->
gen_server:reply(From, lists:reverse([Value | Replies])),
NewQueue;
{{value, {N, From, Replies}}, NewQueue} when N > 1 ->
- queue:in({N - 1, From, [Value | Replies]}, NewQueue);
+ queue:in_r({N - 1, From, [Value | Replies]}, NewQueue);
{empty, Queue} ->
%% Oops
error_logger:info_msg("Nothing in queue, but got value from parser~n"),
View
@@ -75,6 +75,21 @@ pipeline_test() ->
?assertMatch({ok, _}, eredis:q(C, ["DEL", a, b])).
+pipeline_mixed_test() ->
+ C = c(),
+ P1 = [["LPUSH", c, "1"] || _ <- lists:seq(1, 100)],
+ P2 = [["LPUSH", d, "1"] || _ <- lists:seq(1, 100)],
+ Expect = [{ok, list_to_binary(integer_to_list(I))} || I <- lists:seq(1, 100)],
+ spawn(fun () ->
+ erlang:yield(),
+ ?assertEqual(Expect, eredis:qp(C, P1))
+ end),
+ spawn(fun () ->
+ ?assertEqual(Expect, eredis:qp(C, P2))
+ end),
+ timer:sleep(10),
+ ?assertMatch({ok, _}, eredis:q(C, ["DEL", c, d])).
+
c() ->
Res = eredis:start_link(),

0 comments on commit 52addf9

Please sign in to comment.