Navigation Menu

Skip to content
This repository has been archived by the owner on Jul 16, 2020. It is now read-only.

Commit

Permalink
Reduce the request and reply services to loops as well.
Browse files Browse the repository at this point in the history
  • Loading branch information
squaremo committed Nov 10, 2010
1 parent b28b3c5 commit 8469f00
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 83 deletions.
87 changes: 45 additions & 42 deletions src/r0mq_rep_service.erl
Expand Up @@ -5,18 +5,14 @@
%% See http://wiki.github.com/rabbitmq/rmq-0mq/reqrep

%% Callbacks
-export([init/3, create_socket/0,
start_listening/2, zmq_message/3, amqp_message/5]).
-export([init/3, create_socket/0, start_listening/3]).

-include_lib("amqp_client/include/amqp_client.hrl").

-record(state, {req_queue, % used as routing key for requests
req_exchange, % exchange to which to send requests
rep_exchange, % (probably default) exchange to send replies
rep_queue, % (private) queue from which to collect replies
incoming_state = path, % state of multipart request message
incoming_path = [],
reply_tag = none % consumer tag for reply queue
rep_queue % (private) queue from which to collect replies
}).

%% -- Callbacks --
Expand All @@ -28,7 +24,7 @@
%% sending, we just do it all in one go.

create_socket() ->
{ok, In} = zmq:socket(xrep, [{active, true}]),
{ok, In} = zmq:socket(xrep, [{active, false}]),
In.

init(Options, Connection, ConsumeChannel) ->
Expand All @@ -51,26 +47,43 @@ init(Options, Connection, ConsumeChannel) ->
rep_exchange = RepExchange,
rep_queue = RepQueueName}}.

start_listening(Channel, State = #state{rep_queue = RepQueue}) ->
start_listening(Channel, Sock, State = #state{rep_queue = RepQueue}) ->
ConsumeRep = #'basic.consume'{ queue = RepQueue,
no_ack = true,
exclusive = true },
#'basic.consume_ok'{consumer_tag = RepTag } =
amqp_channel:subscribe(Channel, ConsumeRep, self()),
{ok, State#state{ reply_tag = RepTag }}.
_Pid = spawn_link(fun() ->
#'basic.consume_ok'{consumer_tag = RepTag } =
amqp_channel:subscribe(Channel, ConsumeRep, self()),
receive
#'basic.consume_ok'{} -> ok
end,
response_loop(Channel, Sock, State)
end),
_Pid2 = spawn_link(fun() ->
request_loop(Channel, Sock, State, [], path)
end),
{ok, State}.

response_loop(Channel, Sock, State) ->
receive
{#'basic.deliver'{},
#amqp_msg{ payload = Payload,
props = Props }} ->
#'P_basic'{correlation_id = CorrelationId} = Props,
Path = decode_path(CorrelationId),
lists:foreach(fun (PathElement) ->
zmq:send(Sock, PathElement, [sndmore])
end, Path),
zmq:send(Sock, <<>>, [sndmore]),
zmq:send(Sock, Payload),
response_loop(Channel, Sock, State)
end.

%% If we get a zero-length payload, it means we've got the path, and
%% the next is the request payload.
zmq_message(<<>>, _Channel, State = #state{incoming_state = path}) ->
{ok, State#state{incoming_state = payload}};
zmq_message(Data, _Channel, State = #state{incoming_state = path,
incoming_path = Path}) ->
{ok, State#state{incoming_path = [Data | Path]}};
zmq_message(Data, Channel, State = #state{incoming_state = payload,
incoming_path = Path,
req_exchange = Exchange,
req_queue = Queue,
rep_queue = ReplyQueue}) ->
request_loop(Channel, Sock, State = #state{ req_queue = Queue,
rep_queue = ReplyQueue,
req_exchange = Exchange },
Path, payload) ->
{ok, Data} = zmq:recv(Sock),
CorrelationId = encode_path(Path),
Msg = #amqp_msg{payload = Data,
props = #'P_basic'{
Expand All @@ -79,25 +92,15 @@ zmq_message(Data, Channel, State = #state{incoming_state = payload,
Pub = #'basic.publish'{ exchange = Exchange,
routing_key = Queue },
amqp_channel:cast(Channel, Pub, Msg),
{ok, State#state{incoming_state = path, incoming_path = []}}.

amqp_message(#'basic.deliver'{consumer_tag = Tag},
#amqp_msg{ payload = Payload, props = Props },
Sock, _Channel, State) ->
%% A reply. Since it's for us, the correlation id will be
%% our encoded correlation id
#'P_basic'{correlation_id = CorrelationId} = Props,
io:format("Reply received ~p (corr id ~p)~n", [Payload, CorrelationId]),
Path = decode_path(CorrelationId),
lists:foreach(fun (PathElement) ->
io:format("Sending ~p~n", [PathElement]),
zmq:send(Sock, PathElement, [sndmore])
end, Path),
io:format("Sending <<>>~n", []),
zmq:send(Sock, <<>>, [sndmore]),
io:format("Sending ~p~n", [Payload]),
zmq:send(Sock, Payload),
{ok, State}.
request_loop(Channel, Sock, State, [], path);
request_loop(Channel, Sock, State, Path, path) ->
{ok, Msg} = zmq:recv(Sock),
case Msg of
<<>> ->
request_loop(Channel, Sock, State, Path, payload);
PathElem ->
request_loop(Channel, Sock, State, [ PathElem | Path ], path)
end.

%% FIXME only deal with one for the minute
encode_path([Id]) ->
Expand Down
88 changes: 47 additions & 41 deletions src/r0mq_req_service.erl
Expand Up @@ -5,17 +5,13 @@
%% See http://wiki.github.com/rabbitmq/rmq-0mq/reqrep

%% Callbacks
-export([init/3, create_socket/0,
start_listening/2, zmq_message/3, amqp_message/5]).
-export([init/3, create_socket/0, start_listening/3]).

-include_lib("amqp_client/include/amqp_client.hrl").

-record(state, {req_exchange, % (probably default) exchange from which we get requests
req_queue, % (probably shared) queue from which we retrieve requests
rep_exchange, % (probably default) exchange to send replies
outgoing_state = path, % state of multipart reply message
outgoing_path = [],
request_tag = none % consumer tag for request queue
rep_exchange % (probably default) exchange to send replies
}).

%% -- Callbacks --
Expand All @@ -27,7 +23,7 @@
%% sending, we just do it all in one go.

create_socket() ->
{ok, Out} = zmq:socket(xreq, [{active, true}]),
{ok, Out} = zmq:socket(xreq, [{active, false}]),
Out.

init(Options, Connection, _ConsumeChannel) ->
Expand All @@ -47,22 +43,45 @@ init(Options, Connection, _ConsumeChannel) ->
rep_exchange = RepExchange,
req_queue = ReqQueueName}}.

start_listening(Channel, State = #state{req_queue = ReqQueue}) ->
start_listening(Channel, Sock, State = #state{req_queue = ReqQueue}) ->
ConsumeReq = #'basic.consume'{ queue = ReqQueue,
no_ack = true,
exclusive = false },
#'basic.consume_ok'{consumer_tag = ReqTag } =
amqp_channel:subscribe(Channel, ConsumeReq, self()),
{ok, State#state{ request_tag = ReqTag }}.
%% We are listening for two things:
%% Firstly, deliveries from our request queue, which are forwarded to
%% the outgoing port; second is incoming responses, which are forwarded
%% to the (decoded) reply-to queue.
_Pid = spawn_link(fun() ->
amqp_channel:subscribe(Channel, ConsumeReq, self()),
receive
#'basic.consume_ok'{} -> ok
end,
request_loop(Channel, Sock, State)
end),
_Pid2 = spawn_link(fun() ->
response_loop(Channel, Sock, State, [], path)
end),
{ok, State}.

request_loop(Channel, Sock, State) ->
receive
{#'basic.deliver'{},
#amqp_msg{ payload = Payload, props = Props }} ->
#'P_basic'{correlation_id = CorrelationId,
reply_to = ReplyTo } = Props,
case CorrelationId of
undefined -> no_send;
Id -> zmq:send(Sock, Id, [sndmore])
end,
zmq:send(Sock, ReplyTo, [sndmore]),
zmq:send(Sock, <<>>, [sndmore]),
zmq:send(Sock, Payload),
request_loop(Channel, Sock, State)
end.

zmq_message(<<>>, _Channel, State = #state{outgoing_state = path}) ->
{ok, State#state{outgoing_state = payload}};
zmq_message(Data, _Channel, State = #state{outgoing_state = path,
outgoing_path = Path}) ->
{ok, State#state{outgoing_path = [Data | Path]}};
zmq_message(Data, Channel, State = #state{outgoing_state = payload,
outgoing_path = Path,
req_exchange = Exchange}) ->
response_loop(Channel, Sock, State = #state{rep_exchange = Exchange},
Path, payload) ->
{ok, Data} = zmq:recv(Sock),
[ ReplyTo | Rest ] = Path,
CorrelationId = case Rest of
[] -> undefined;
Expand All @@ -75,25 +94,12 @@ zmq_message(Data, Channel, State = #state{outgoing_state = payload,
Pub = #'basic.publish'{ exchange = Exchange,
routing_key = ReplyTo },
amqp_channel:cast(Channel, Pub, Msg),
{ok, State#state{outgoing_state = path, outgoing_path = []}}.

amqp_message(#'basic.deliver'{consumer_tag = Tag},
#amqp_msg{ payload = Payload, props = Props },
Sock,
_Channel,
State = #state{request_tag = ReqTag}) ->
%% A request, either from an AMQP client, or us
#'P_basic'{correlation_id = CorrelationId,
reply_to = ReplyTo} = Props,
io:format("Request received ~p (corr id ~p)~n", [Payload, CorrelationId]),
case CorrelationId of
undefined -> no_send;
Id -> zmq:send(Sock, Id, [sndmore])
end,
io:format("Sending: ~p~n", [ReplyTo]),
zmq:send(Sock, ReplyTo, [sndmore]),
io:format("Sending: <<>>~n", []),
zmq:send(Sock, <<>>, [sndmore]),
io:format("Sending: ~p~n", [Payload]),
zmq:send(Sock, Payload),
{ok, State}.
response_loop(Channel, Sock, State, [], path);
response_loop(Channel, Sock, State, Path, path) ->
{ok, Msg} = zmq:recv(Sock),
case Msg of
<<>> ->
response_loop(Channel, Sock, State, Path, payload);
PathElem ->
response_loop(Channel, Sock, State, [ PathElem | Path ], path)
end.

0 comments on commit 8469f00

Please sign in to comment.