Permalink
Browse files

Merge pull request #4 from cloudant/13311-improve-view-back-pressure

Implement message stream interface

BugzID: 13311
BugzID: 14075
  • Loading branch information...
2 parents c6b7778 + c3b0548 commit 8055f14304bc5505a7b49bfd6142ea53fcc00a17 @kocolosk kocolosk committed Jul 17, 2012
Showing with 58 additions and 0 deletions.
  1. +58 −0 src/rexi.erl
View
58 src/rexi.erl
@@ -18,6 +18,7 @@
-export([reply/1, sync_reply/1, sync_reply/2]).
-export([async_server_call/2, async_server_call/3]).
-export([get_errors/0, get_last_error/0, set_error_limit/1]).
+-export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
-include("rexi.hrl").
@@ -127,6 +128,63 @@ sync_reply(Reply, Timeout) ->
timeout
end.
+%% @equiv stream(Msg, 100, 300000)
+stream(Msg) ->
+ stream(Msg, 100, 300000).
+
+%% @equiv stream(Msg, Limit, 300000)
+stream(Msg, Limit) ->
+ stream(Msg, Limit, 300000).
+
+%% @doc convenience function to stream messages to caller while blocking when
+%% a specific number of messages are outstanding. Message is of the form
+%% {OriginalRef, self(), Reply}, which enables the original caller to ack.
+-spec stream(any(), integer(), pos_integer() | infinity) -> any().
+stream(Msg, Limit, Timeout) ->
+ try maybe_wait(Limit, Timeout) of
+ {ok, Count} ->
+ put(rexi_unacked, Count+1),
+ {Caller, Ref} = get(rexi_from),
+ erlang:send(Caller, {Ref, self(), Msg}),
+ ok
+ catch throw:timeout ->
+ timeout
+ end.
+
+%% @equiv stream_ack(Client, 1)
+stream_ack(Client) ->
+ erlang:send(Client, {rexi_ack, 1}).
+
+%% @doc Ack streamed messages
+stream_ack(Client, N) ->
+ erlang:send(Client, {rexi_ack, N}).
+
%% internal functions %%
cast_msg(Msg) -> {'$gen_cast', Msg}.
+
+maybe_wait(Limit, Timeout) ->
+ case get(rexi_unacked) of
+ undefined ->
+ {ok, 0};
+ Count when Count >= Limit ->
+ wait_for_ack(Count, Timeout);
+ Count ->
+ drain_acks(Count)
+ end.
+
+wait_for_ack(Count, Timeout) ->
+ receive
+ {rexi_ack, N} -> drain_acks(Count-N)
+ after Timeout ->
+ throw(timeout)
+ end.
+
+drain_acks(Count) when Count < 0 ->
+ erlang:error(mismatched_rexi_ack);
+drain_acks(Count) ->
+ receive
+ {rexi_ack, N} -> drain_acks(Count-N)
+ after 0 ->
+ {ok, Count}
+ end.

0 comments on commit 8055f14

Please sign in to comment.