Permalink
Browse files

Simplify all services and the service gen_server now that they are co…

…nverted to loops. The gen_server is not threading state through -- it's just looking after params.
  • Loading branch information...
1 parent 8469f00 commit 8b26e3f4d7951be8a20ec46e9a7d609c933d151e @squaremo squaremo committed Nov 10, 2010
Showing with 104 additions and 133 deletions.
  1. +13 −16 src/r0mq_pub_service.erl
  2. +10 −10 src/r0mq_pull_service.erl
  3. +12 −22 src/r0mq_push_service.erl
  4. +27 −27 src/r0mq_rep_service.erl
  5. +22 −20 src/r0mq_req_service.erl
  6. +10 −28 src/r0mq_service.erl
  7. +10 −10 src/r0mq_sub_service.erl
View
@@ -10,7 +10,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
--record(state, {queue}).
+-record(params, {queue}).
%% -- Callbacks --
@@ -29,10 +29,10 @@ init(Options, Connection, ConsumeChannel) ->
{ok, Exchange} ->
{ok, Queue} = r0mq_util:create_bind_private_queue(
Exchange, <<"">>, ConsumeChannel),
- {ok, #state{queue = Queue}}
+ {ok, #params{queue = Queue}}
end.
-start_listening(Channel, Sock, State = #state{queue = Queue}) ->
+start_listening(Channel, Sock, Params = #params{queue = Queue}) ->
%% We use prefetch here for flow control
amqp_channel:call(Channel, #'basic.qos'{prefetch_count = 100}),
Consume = #'basic.consume'{ queue = Queue,
@@ -44,20 +44,17 @@ start_listening(Channel, Sock, State = #state{queue = Queue}) ->
receive
#'basic.consume_ok'{} -> ok
end,
- loop(Channel, Sock, State)
+ loop(Channel, Sock, Params)
end),
- {ok, State}.
+ {ok, Params}.
-loop(Channel, Sock, State) ->
+loop(Channel, Sock, Params) ->
receive
- {#'basic.deliver'{delivery_tag = Tag}, Msg} ->
- {ok, State1} = send_message(Msg, Sock, State),
+ {#'basic.deliver'{delivery_tag = Tag},
+ #amqp_msg{ payload = Payload}} ->
+ ok = zmq:send(Sock, Payload),
amqp_channel:cast(Channel,
- #'basic.ack'{delivery_tag = Tag, multiple = false}),
- loop(Channel, Sock, State1)
- end.
-
-send_message(#amqp_msg{payload = Payload}, Sock, State) ->
- case zmq:send(Sock, Payload) of
- ok -> {ok, State}
- end.
+ #'basic.ack'{delivery_tag = Tag,
+ multiple = false})
+ end,
+ loop(Channel, Sock, Params).
View
@@ -9,7 +9,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
--record(state, {queue}).
+-record(params, {queue}).
%% -- Callbacks --
@@ -24,23 +24,23 @@ init(Options, Connection, ConsumeChannel) ->
QName when is_binary(QName) ->
r0mq_util:ensure_shared_queue(
QName, <<"">>, queue, Connection),
- {ok, #state{queue = QName}}
+ {ok, #params{queue = QName}}
end.
-start_listening(Channel, Sock, State) ->
+start_listening(Channel, Sock, Params) ->
_Pid = spawn_link(fun () ->
- loop(Channel, Sock, State)
+ loop(Channel, Sock, Params)
end),
- {ok, State}.
+ {ok, Params}.
-loop(Channel, Sock, State) ->
+loop(Channel, Sock, Params) ->
{ok, Msg} = zmq:recv(Sock),
- {ok, State1} = publish_message(Msg, Channel, State),
- loop(Channel, Sock, State1).
+ ok = publish_message(Msg, Channel, Params),
+ loop(Channel, Sock, Params).
-publish_message(Data, Channel, State = #state{queue = Queue }) ->
+publish_message(Data, Channel, #params{queue = Queue}) ->
Msg = #amqp_msg{payload = Data},
Pub = #'basic.publish'{ exchange = <<"">>,
routing_key = Queue },
amqp_channel:cast(Channel, Pub, Msg),
- {ok, State}.
+ ok.
View
@@ -5,16 +5,11 @@
%% See http://wiki.github.com/rabbitmq/rmq-0mq/pipeline
%% Callbacks
-%-export([init/1, terminate/2, code_change/3,
-% handle_call/3, handle_cast/2, handle_info/2]).
-
-
-%% Callbacks
-export([init/3, create_socket/0, start_listening/3]).
-include_lib("amqp_client/include/amqp_client.hrl").
--record(state, {queue}).
+-record(params, {queue}).
%% -- Callbacks --
@@ -29,10 +24,10 @@ init(Options, Connection, ConsumeChannel) ->
QName when is_binary(QName) ->
r0mq_util:ensure_shared_queue(
QName, <<"">>, queue, Connection),
- {ok, #state{queue = QName}}
+ {ok, #params{queue = QName}}
end.
-start_listening(Channel, Sock, State = #state{queue = Queue}) ->
+start_listening(Channel, Sock, Params = #params{queue = Queue}) ->
%% We use acking and basic.qos as a HWM for our process mailbox, so we don't
%% pile things up in the (unbounded) process mailbox.
amqp_channel:call(Channel, #'basic.qos'{prefetch_count = 100}),
@@ -45,20 +40,15 @@ start_listening(Channel, Sock, State = #state{queue = Queue}) ->
receive
#'basic.consume_ok'{} -> ok
end,
- loop(Channel, Sock, State)
+ loop(Channel, Sock, Params)
end),
- {ok, State}.
+ {ok, Params}.
-loop(Channel, Sock, State) ->
+loop(Channel, Sock, Params) ->
receive
- {#'basic.deliver'{delivery_tag = Tag}, Msg} ->
- {ok, State1} = send_message(Msg, Sock, State),
- amqp_channel:cast(Channel,
- #'basic.ack'{delivery_tag = Tag, multiple = false}),
- loop(Channel, Sock, State1)
- end.
-
-send_message(#amqp_msg{ payload = Payload }, Sock, State) ->
- case zmq:send(Sock, Payload) of
- ok -> {ok, State}
- end.
+ {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{ payload = Msg} } ->
+ ok = zmq:send(Sock, Msg),
+ amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag,
+ multiple = false})
+ end,
+ loop(Channel, Sock, Params).
View
@@ -9,11 +9,10 @@
-include_lib("amqp_client/include/amqp_client.hrl").
--record(state, {req_queue, % used as routing key for requests
- req_exchange, % exchange to which to send requests
- rep_exchange, % (probably default) exchange to send replies
- rep_queue % (private) queue from which to collect replies
- }).
+-record(params, {req_queue, % used as routing key for requests
+ req_exchange, % exchange to which to send requests
+ rep_queue % (private) queue from which to collect replies
+ }).
%% -- Callbacks --
@@ -27,7 +26,7 @@ create_socket() ->
{ok, In} = zmq:socket(xrep, [{active, false}]),
In.
-init(Options, Connection, ConsumeChannel) ->
+init(Options, _Connection, ConsumeChannel) ->
%% We MUST have a request queue name to use as a routing key;
%% there's no point in constructing a private queue, because
%% no-one will be listening to it.
@@ -42,31 +41,30 @@ init(Options, Connection, ConsumeChannel) ->
RepExchange = <<"">>,
{ok, RepQueueName} = r0mq_util:create_bind_private_queue(
RepExchange, queue, ConsumeChannel),
- {ok, #state{ req_queue = ReqQueueName,
- req_exchange = ReqExchange,
- rep_exchange = RepExchange,
- rep_queue = RepQueueName}}.
+ {ok, #params{ req_queue = ReqQueueName,
+ req_exchange = ReqExchange,
+ rep_queue = RepQueueName}}.
-start_listening(Channel, Sock, State = #state{rep_queue = RepQueue}) ->
+start_listening(Channel, Sock, Params = #params{rep_queue = RepQueue}) ->
ConsumeRep = #'basic.consume'{ queue = RepQueue,
- no_ack = true,
+ no_ack = false,
exclusive = true },
_Pid = spawn_link(fun() ->
- #'basic.consume_ok'{consumer_tag = RepTag } =
+ #'basic.consume_ok'{} =
amqp_channel:subscribe(Channel, ConsumeRep, self()),
receive
#'basic.consume_ok'{} -> ok
end,
- response_loop(Channel, Sock, State)
+ response_loop(Channel, Sock, Params)
end),
_Pid2 = spawn_link(fun() ->
- request_loop(Channel, Sock, State, [], path)
+ request_loop(Channel, Sock, Params, [], path)
end),
- {ok, State}.
+ {ok, Params}.
-response_loop(Channel, Sock, State) ->
+response_loop(Channel, Sock, Params) ->
receive
- {#'basic.deliver'{},
+ {#'basic.deliver'{ delivery_tag = Tag },
#amqp_msg{ payload = Payload,
props = Props }} ->
#'P_basic'{correlation_id = CorrelationId} = Props,
@@ -76,12 +74,14 @@ response_loop(Channel, Sock, State) ->
end, Path),
zmq:send(Sock, <<>>, [sndmore]),
zmq:send(Sock, Payload),
- response_loop(Channel, Sock, State)
-end.
+ amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag,
+ multiple = false })
+ end,
+ response_loop(Channel, Sock, Params).
-request_loop(Channel, Sock, State = #state{ req_queue = Queue,
- rep_queue = ReplyQueue,
- req_exchange = Exchange },
+request_loop(Channel, Sock, Params = #params{ req_queue = Queue,
+ rep_queue = ReplyQueue,
+ req_exchange = Exchange },
Path, payload) ->
{ok, Data} = zmq:recv(Sock),
CorrelationId = encode_path(Path),
@@ -92,14 +92,14 @@ request_loop(Channel, Sock, State = #state{ req_queue = Queue,
Pub = #'basic.publish'{ exchange = Exchange,
routing_key = Queue },
amqp_channel:cast(Channel, Pub, Msg),
- request_loop(Channel, Sock, State, [], path);
-request_loop(Channel, Sock, State, Path, path) ->
+ request_loop(Channel, Sock, Params, [], path);
+request_loop(Channel, Sock, Params, Path, path) ->
{ok, Msg} = zmq:recv(Sock),
case Msg of
<<>> ->
- request_loop(Channel, Sock, State, Path, payload);
+ request_loop(Channel, Sock, Params, Path, payload);
PathElem ->
- request_loop(Channel, Sock, State, [ PathElem | Path ], path)
+ request_loop(Channel, Sock, Params, [ PathElem | Path ], path)
end.
%% FIXME only deal with one for the minute
View
@@ -9,9 +9,9 @@
-include_lib("amqp_client/include/amqp_client.hrl").
--record(state, {req_exchange, % (probably default) exchange from which we get requests
- req_queue, % (probably shared) queue from which we retrieve requests
- rep_exchange % (probably default) exchange to send replies
+-record(params, {req_exchange, % (probably default) exchange through which we get requests
+ req_queue, % (probably shared) queue from which we retrieve requests
+ rep_exchange % (probably default) exchange to send replies
}).
%% -- Callbacks --
@@ -39,13 +39,13 @@ init(Options, Connection, _ConsumeChannel) ->
ReqExchange = <<"">>,
RepExchange = <<"">>,
r0mq_util:ensure_shared_queue(ReqQueueName, ReqExchange, queue, Connection),
- {ok, #state{ req_exchange = ReqExchange,
- rep_exchange = RepExchange,
- req_queue = ReqQueueName}}.
+ {ok, #params{ req_exchange = ReqExchange,
+ rep_exchange = RepExchange,
+ req_queue = ReqQueueName}}.
-start_listening(Channel, Sock, State = #state{req_queue = ReqQueue}) ->
+start_listening(Channel, Sock, Params = #params{req_queue = ReqQueue}) ->
ConsumeReq = #'basic.consume'{ queue = ReqQueue,
- no_ack = true,
+ no_ack = false,
exclusive = false },
%% We are listening for two things:
%% Firstly, deliveries from our request queue, which are forwarded to
@@ -56,16 +56,16 @@ start_listening(Channel, Sock, State = #state{req_queue = ReqQueue}) ->
receive
#'basic.consume_ok'{} -> ok
end,
- request_loop(Channel, Sock, State)
+ request_loop(Channel, Sock, Params)
end),
_Pid2 = spawn_link(fun() ->
- response_loop(Channel, Sock, State, [], path)
+ response_loop(Channel, Sock, Params, [], path)
end),
- {ok, State}.
+ {ok, Params}.
-request_loop(Channel, Sock, State) ->
+request_loop(Channel, Sock, Params) ->
receive
- {#'basic.deliver'{},
+ {#'basic.deliver'{ delivery_tag = Tag },
#amqp_msg{ payload = Payload, props = Props }} ->
#'P_basic'{correlation_id = CorrelationId,
reply_to = ReplyTo } = Props,
@@ -76,10 +76,12 @@ request_loop(Channel, Sock, State) ->
zmq:send(Sock, ReplyTo, [sndmore]),
zmq:send(Sock, <<>>, [sndmore]),
zmq:send(Sock, Payload),
- request_loop(Channel, Sock, State)
- end.
+ amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag,
+ multiple = false })
+ end,
+ request_loop(Channel, Sock, Params).
-response_loop(Channel, Sock, State = #state{rep_exchange = Exchange},
+response_loop(Channel, Sock, Params = #params{rep_exchange = Exchange},
Path, payload) ->
{ok, Data} = zmq:recv(Sock),
[ ReplyTo | Rest ] = Path,
@@ -94,12 +96,12 @@ response_loop(Channel, Sock, State = #state{rep_exchange = Exchange},
Pub = #'basic.publish'{ exchange = Exchange,
routing_key = ReplyTo },
amqp_channel:cast(Channel, Pub, Msg),
- response_loop(Channel, Sock, State, [], path);
-response_loop(Channel, Sock, State, Path, path) ->
+ response_loop(Channel, Sock, Params, [], path);
+response_loop(Channel, Sock, Params, Path, path) ->
{ok, Msg} = zmq:recv(Sock),
case Msg of
<<>> ->
- response_loop(Channel, Sock, State, Path, payload);
+ response_loop(Channel, Sock, Params, Path, payload);
PathElem ->
- response_loop(Channel, Sock, State, [ PathElem | Path ], path)
+ response_loop(Channel, Sock, Params, [ PathElem | Path ], path)
end.
Oops, something went wrong.

0 comments on commit 8b26e3f

Please sign in to comment.