Skip to content
Browse files

verx_client_tcp: accumulate messages in the mailbox

Use selective receive to queue all the messages in the caller's mailbox.
  • Loading branch information...
1 parent c844858 commit 38a3c39b8fd9719180af78d31d2a70fd48db9bad @msantos committed Sep 13, 2012
Showing with 28 additions and 33 deletions.
  1. +28 −33 src/verx_client_tcp.erl
View
61 src/verx_client_tcp.erl
@@ -38,6 +38,9 @@
-export([
call/2, call/3,
+ cast/2, cast/3, cast/4,
+ reply/2, reply/3,
+
recv/1, recv/2,
recvall/1, recvall/2,
@@ -66,18 +69,24 @@
call(Ref, Proc) ->
call(Ref, Proc, []).
call(Ref, Proc, Arg) when is_pid(Ref), is_atom(Proc), is_list(Arg) ->
- ok = gen_server:call(Ref, {call, Proc, Arg}, infinity),
- call_1(Ref).
-
-call_1(Ref) ->
+ {ok, Serial} = cast(Ref, Proc, Arg, infinity),
+ reply(Ref, Serial).
+
+cast(Ref, Proc) ->
+ cast(Ref, Proc, [], infinity).
+cast(Ref, Proc, Arg) ->
+ cast(Ref, Proc, Arg, infinity).
+cast(Ref, Proc, Arg, Timeout) ->
+ gen_server:call(Ref, {call, Proc, Arg}, Timeout).
+
+reply(Ref, Serial) ->
+ reply(Ref, Serial, infinity).
+reply(Ref, Serial, Timeout) ->
receive
- {verx, Ref, Reply} ->
- verx_rpc:status(Reply);
- {verx, Ref, {out_of_sync, _, _, _} = Reply} ->
- error_logger:error_report(Reply),
- call_1(Ref)
+ {verx, Ref, {#remote_message_header{serial = <<Serial:32>>, type = <<?REMOTE_REPLY:32>>}, _} = Reply} ->
+ verx_rpc:status(Reply)
after
- 5000 ->
+ Timeout ->
{error, eagain}
end.
@@ -94,9 +103,7 @@ recv(Ref, Timeout, Acc) ->
{verx, Ref, {#remote_message_header{
type = <<?REMOTE_STREAM:32>>,
status = <<?REMOTE_CONTINUE:32>>}, Payload}} ->
- recv(Ref, Timeout, [Payload|Acc]);
- {verx, Ref, Buf} ->
- error_logger:info_report([{got, Buf}])
+ recv(Ref, Timeout, [Payload|Acc])
after
Timeout ->
{error, eagain}
@@ -122,9 +129,7 @@ recvall(Ref, Timeout, Acc) ->
{verx, Ref, {#remote_message_header{
type = <<?REMOTE_STREAM:32>>,
status = <<?REMOTE_CONTINUE:32>>}, Payload}} ->
- recvall(Ref, Timeout, [Payload|Acc]);
- {verx, Ref, Buf} ->
- error_logger:info_report([{got, Buf}])
+ recvall(Ref, Timeout, [Payload|Acc])
after
Timeout ->
case length(Acc) of
@@ -193,7 +198,10 @@ handle_call({call, Proc, Arg}, _From, #state{
Message = verx_rpc:encode({Header#remote_message_header{
serial = <<Serial:32>>
}, Call}),
- Reply = send_rpc(Socket, Message),
+ Reply = case send_rpc(Socket, Message) of
+ ok -> {ok, Serial};
+ Error -> Error
+ end,
inet:setopts(Socket, [{active, once}]),
{reply, Reply, State#state{proc = Proc, serial = Serial+1}};
@@ -240,11 +248,10 @@ handle_cast(_Msg, State) ->
handle_info({tcp, Socket, Data},
#state{s = Socket,
pid = Pid,
- serial = Serial,
buf = Buf} = State) ->
inet:setopts(Socket, [{active, once}]),
{Msgs, Rest} = verx_client:stream(Data, Buf),
- [ reply_to_caller(Pid, Serial, Msg) || Msg <- Msgs ],
+ [ reply_to_caller(Pid, Msg) || Msg <- Msgs ],
{noreply, State#state{buf = Rest}};
handle_info({tcp_closed, Socket}, #state{s = Socket} = State) ->
@@ -285,19 +292,7 @@ send_rpc(Socket, Buf) ->
Len = ?REMOTE_MESSAGE_HEADER_XDR_LEN + byte_size(Buf),
gen_tcp:send(Socket, <<?UINT32(Len), Buf/binary>>).
-reply_to_caller(Pid, Serial0, Data) ->
+reply_to_caller(Pid, Data) ->
Reply = verx_rpc:decode(Data),
- {#remote_message_header{type = <<?UINT32(Type)>>,
- serial = <<?UINT32(RSerial)>>}, _} = Reply,
-
- % serial increments on every call
- Serial = Serial0 - 1,
- case {Type, RSerial} of
- {N, Serial} when N =:= ?REMOTE_REPLY; N =:= ?REMOTE_STREAM ->
- Pid ! {verx, self(), Reply};
- {?REMOTE_MESSAGE, 0} ->
- Pid ! {verx, self(), Reply};
- _ ->
- Pid ! {verx, self(), {out_of_sync, RSerial, Serial, Reply}}
- end,
+ Pid ! {verx, self(), Reply},
ok.

0 comments on commit 38a3c39

Please sign in to comment.
Something went wrong with that request. Please try again.