Skip to content
This repository has been archived by the owner on Jul 16, 2020. It is now read-only.

Commit

Permalink
Rename services to reflect the underlying socket, to avoid another le…
Browse files Browse the repository at this point in the history
…vel of meaning flipping. The pub service has a pub socket and you connect sub sockets to it. Easy. (Also, add a file that had slipped through the net. Ops.)
  • Loading branch information
squaremo committed Sep 16, 2010
1 parent 6ff44de commit ba5816b
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 26 deletions.
4 changes: 2 additions & 2 deletions examples/pipeline/rabbitmq.config
@@ -1,4 +1,4 @@
[{r0mq,
[{services,
[{<<"PIPELINE">>, push, "tcp://127.0.0.1:5557"},
{<<"PIPELINE">>, pull, "tcp://127.0.0.1:5558"}]}]}].
[{<<"PIPELINE">>, pull, "tcp://127.0.0.1:5557"},
{<<"PIPELINE">>, push, "tcp://127.0.0.1:5558"}]}]}].
4 changes: 2 additions & 2 deletions examples/pubsub/rabbitmq.config
@@ -1,4 +1,4 @@
[{r0mq,
[{services,
[{<<"PUBSUB">>, pub, "tcp://127.0.0.1:5555"},
{<<"PUBSUB">>, sub, "tcp://127.0.0.1:5556"}]}]}].
[{<<"PUBSUB">>, sub, "tcp://127.0.0.1:5555"},
{<<"PUBSUB">>, pub, "tcp://127.0.0.1:5556"}]}]}].
4 changes: 2 additions & 2 deletions examples/reqrep/rabbitmq.config
@@ -1,4 +1,4 @@
[{r0mq,
[{services,
[{<<"REQREP">>, req, "tcp://127.0.0.1:5559"},
{<<"REQREP">>, rep, "tcp://127.0.0.1:5560"}]}]}].
[{<<"REQREP">>, rep, "tcp://127.0.0.1:5559"},
{<<"REQREP">>, req, "tcp://127.0.0.1:5560"}]}]}].
50 changes: 50 additions & 0 deletions src/r0mq_pub_service.erl
@@ -0,0 +1,50 @@
-module(r0mq_pub_service).

%% A pub service.
%%
%% See http://wiki.github.com/rabbitmq/rmq-0mq/pubsub

%% Callbacks
-export([init/3, create_socket/0,
start_listening/2, zmq_message/3, amqp_message/5]).

-include_lib("amqp_client/include/amqp_client.hrl").

-record(state, {queue}).

%% -- Callbacks --

create_socket() ->
{ok, Out} = zmq:socket(pub, [{active, true}]),
Out.

init(Options, Connection, ConsumeChannel) ->
Exchange = case r0mq_util:get_option(name, Options) of
missing -> throw({?MODULE, no_name_supplied, Options});
Name -> Name
end,
case r0mq_util:ensure_exchange(Exchange, <<"fanout">>, Connection) of
{error, _, _Spec} ->
%io:format("Error declaring exchange ~p~n", [Spec]),
throw({cannot_declare_exchange, Exchange});
{ok, Exchange} ->
%io:format("Exchange OK ~p~n", [Exchange]),
{ok, Queue} = r0mq_util:create_bind_private_queue(
Exchange, <<"">>, ConsumeChannel),
%io:format("Using queue ~p~n", [Queue]),
{ok, #state{queue = Queue}}
end.

start_listening(Channel, State = #state{queue = Queue}) ->
Consume = #'basic.consume'{ queue = Queue,
no_ack = true,
exclusive = true },
amqp_channel:subscribe(Channel, Consume, self()),
{ok, State}.

zmq_message(Data, _Channel, _State) ->
throw({?MODULE, unexpected_zmq_message, Data}).

amqp_message(_Env, #amqp_msg{ payload = Payload }, Sock, _Channel, State) ->
zmq:send(Sock, Payload),
{ok, State}.
4 changes: 2 additions & 2 deletions src/r0mq_push.erl → src/r0mq_pull_service.erl
@@ -1,6 +1,6 @@
-module(r0mq_push).
-module(r0mq_pull_service).

%% A pipeline push (downstream) service.
%% A pipeline pull (upstream) service.
%%
%% See http://wiki.github.com/rabbitmq/rmq-0mq/pipeline

Expand Down
4 changes: 2 additions & 2 deletions src/r0mq_pull.erl → src/r0mq_push_service.erl
@@ -1,6 +1,6 @@
-module(r0mq_pull).
-module(r0mq_push_service).

%% A pull (upstream) pipeline service.
%% A pipeline push (downstream) service.
%%
%% See http://wiki.github.com/rabbitmq/rmq-0mq/pipeline

Expand Down
6 changes: 2 additions & 4 deletions src/r0mq_req.erl → src/r0mq_rep_service.erl
@@ -1,8 +1,6 @@
-module(r0mq_req).
-module(r0mq_rep_service).

%% The request service. The socket accepts
%% requests and sends responses (i.e., it is connected to by req
%% sockets.
%% A reply service.
%%
%% See http://wiki.github.com/rabbitmq/rmq-0mq/reqrep

Expand Down
6 changes: 2 additions & 4 deletions src/r0mq_rep.erl → src/r0mq_req_service.erl
@@ -1,8 +1,6 @@
-module(r0mq_rep).
-module(r0mq_req_service).

%% The reply service. The socket is sent requests and
%% accepts replies (i.e., it is connected to by rep
%% sockets.
%% A request service.
%%
%% See http://wiki.github.com/rabbitmq/rmq-0mq/reqrep

Expand Down
4 changes: 2 additions & 2 deletions src/r0mq_pub.erl → src/r0mq_sub_service.erl
@@ -1,6 +1,6 @@
-module(r0mq_pub).
-module(r0mq_sub_service).

%% A pub service.
%% A sub service.
%%
%% See http://wiki.github.com/rabbitmq/rmq-0mq/pubsub

Expand Down
12 changes: 6 additions & 6 deletions src/r0mq_sup.erl
Expand Up @@ -60,12 +60,12 @@ child_spec(S = {Rendezvous, Type, Address, Options}) ->
end
end.

module_for_type(pub) -> r0mq_pub;
module_for_type(sub) -> r0mq_sub;
module_for_type(req) -> r0mq_req;
module_for_type(rep) -> r0mq_rep;
module_for_type(push) -> r0mq_push;
module_for_type(pull) -> r0mq_pull;
module_for_type(pub) -> r0mq_pub_service;
module_for_type(sub) -> r0mq_sub_service;
module_for_type(req) -> r0mq_req_service;
module_for_type(rep) -> r0mq_rep_service;
module_for_type(push) -> r0mq_push_service;
module_for_type(pull) -> r0mq_pull_service;
module_for_type(_) -> no_such_type.

full_address(Address) when is_list(Address) ->
Expand Down

0 comments on commit ba5816b

Please sign in to comment.