From ba5816b92f31c9038fd26abbd6b26ba2e86c6df9 Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Thu, 16 Sep 2010 18:20:07 +0100 Subject: [PATCH] Rename services to reflect the underlying socket, to avoid another level of meaning flipping. The pub service has a pub socket and you connect sub sockets to it. Easy. (Also, add a file that had slipped through the net. Ops.) --- examples/pipeline/rabbitmq.config | 4 +- examples/pubsub/rabbitmq.config | 4 +- examples/reqrep/rabbitmq.config | 4 +- src/r0mq_pub_service.erl | 50 ++++++++++++++++++++ src/{r0mq_push.erl => r0mq_pull_service.erl} | 4 +- src/{r0mq_pull.erl => r0mq_push_service.erl} | 4 +- src/{r0mq_req.erl => r0mq_rep_service.erl} | 6 +-- src/{r0mq_rep.erl => r0mq_req_service.erl} | 6 +-- src/{r0mq_pub.erl => r0mq_sub_service.erl} | 4 +- src/r0mq_sup.erl | 12 ++--- 10 files changed, 72 insertions(+), 26 deletions(-) create mode 100644 src/r0mq_pub_service.erl rename src/{r0mq_push.erl => r0mq_pull_service.erl} (94%) rename src/{r0mq_pull.erl => r0mq_push_service.erl} (94%) rename src/{r0mq_req.erl => r0mq_rep_service.erl} (97%) rename src/{r0mq_rep.erl => r0mq_req_service.erl} (96%) rename src/{r0mq_pub.erl => r0mq_sub_service.erl} (96%) diff --git a/examples/pipeline/rabbitmq.config b/examples/pipeline/rabbitmq.config index 16d75e7..79abd59 100644 --- a/examples/pipeline/rabbitmq.config +++ b/examples/pipeline/rabbitmq.config @@ -1,4 +1,4 @@ [{r0mq, [{services, - [{<<"PIPELINE">>, push, "tcp://127.0.0.1:5557"}, - {<<"PIPELINE">>, pull, "tcp://127.0.0.1:5558"}]}]}]. + [{<<"PIPELINE">>, pull, "tcp://127.0.0.1:5557"}, + {<<"PIPELINE">>, push, "tcp://127.0.0.1:5558"}]}]}]. diff --git a/examples/pubsub/rabbitmq.config b/examples/pubsub/rabbitmq.config index f249f5e..815f32e 100644 --- a/examples/pubsub/rabbitmq.config +++ b/examples/pubsub/rabbitmq.config @@ -1,4 +1,4 @@ [{r0mq, [{services, - [{<<"PUBSUB">>, pub, "tcp://127.0.0.1:5555"}, - {<<"PUBSUB">>, sub, "tcp://127.0.0.1:5556"}]}]}]. + [{<<"PUBSUB">>, sub, "tcp://127.0.0.1:5555"}, + {<<"PUBSUB">>, pub, "tcp://127.0.0.1:5556"}]}]}]. diff --git a/examples/reqrep/rabbitmq.config b/examples/reqrep/rabbitmq.config index d52e3bd..616fbc1 100644 --- a/examples/reqrep/rabbitmq.config +++ b/examples/reqrep/rabbitmq.config @@ -1,4 +1,4 @@ [{r0mq, [{services, - [{<<"REQREP">>, req, "tcp://127.0.0.1:5559"}, - {<<"REQREP">>, rep, "tcp://127.0.0.1:5560"}]}]}]. + [{<<"REQREP">>, rep, "tcp://127.0.0.1:5559"}, + {<<"REQREP">>, req, "tcp://127.0.0.1:5560"}]}]}]. diff --git a/src/r0mq_pub_service.erl b/src/r0mq_pub_service.erl new file mode 100644 index 0000000..5b3c45d --- /dev/null +++ b/src/r0mq_pub_service.erl @@ -0,0 +1,50 @@ +-module(r0mq_pub_service). + +%% A pub service. +%% +%% See http://wiki.github.com/rabbitmq/rmq-0mq/pubsub + +%% Callbacks +-export([init/3, create_socket/0, + start_listening/2, zmq_message/3, amqp_message/5]). + +-include_lib("amqp_client/include/amqp_client.hrl"). + +-record(state, {queue}). + +%% -- Callbacks -- + +create_socket() -> + {ok, Out} = zmq:socket(pub, [{active, true}]), + Out. + +init(Options, Connection, ConsumeChannel) -> + Exchange = case r0mq_util:get_option(name, Options) of + missing -> throw({?MODULE, no_name_supplied, Options}); + Name -> Name + end, + case r0mq_util:ensure_exchange(Exchange, <<"fanout">>, Connection) of + {error, _, _Spec} -> + %io:format("Error declaring exchange ~p~n", [Spec]), + throw({cannot_declare_exchange, Exchange}); + {ok, Exchange} -> + %io:format("Exchange OK ~p~n", [Exchange]), + {ok, Queue} = r0mq_util:create_bind_private_queue( + Exchange, <<"">>, ConsumeChannel), + %io:format("Using queue ~p~n", [Queue]), + {ok, #state{queue = Queue}} + end. + +start_listening(Channel, State = #state{queue = Queue}) -> + Consume = #'basic.consume'{ queue = Queue, + no_ack = true, + exclusive = true }, + amqp_channel:subscribe(Channel, Consume, self()), + {ok, State}. + +zmq_message(Data, _Channel, _State) -> + throw({?MODULE, unexpected_zmq_message, Data}). + +amqp_message(_Env, #amqp_msg{ payload = Payload }, Sock, _Channel, State) -> + zmq:send(Sock, Payload), + {ok, State}. diff --git a/src/r0mq_push.erl b/src/r0mq_pull_service.erl similarity index 94% rename from src/r0mq_push.erl rename to src/r0mq_pull_service.erl index d7f66d8..daf73f9 100644 --- a/src/r0mq_push.erl +++ b/src/r0mq_pull_service.erl @@ -1,6 +1,6 @@ --module(r0mq_push). +-module(r0mq_pull_service). -%% A pipeline push (downstream) service. +%% A pipeline pull (upstream) service. %% %% See http://wiki.github.com/rabbitmq/rmq-0mq/pipeline diff --git a/src/r0mq_pull.erl b/src/r0mq_push_service.erl similarity index 94% rename from src/r0mq_pull.erl rename to src/r0mq_push_service.erl index 5956b97..e148e18 100644 --- a/src/r0mq_pull.erl +++ b/src/r0mq_push_service.erl @@ -1,6 +1,6 @@ --module(r0mq_pull). +-module(r0mq_push_service). -%% A pull (upstream) pipeline service. +%% A pipeline push (downstream) service. %% %% See http://wiki.github.com/rabbitmq/rmq-0mq/pipeline diff --git a/src/r0mq_req.erl b/src/r0mq_rep_service.erl similarity index 97% rename from src/r0mq_req.erl rename to src/r0mq_rep_service.erl index 741bcab..b8b05ce 100644 --- a/src/r0mq_req.erl +++ b/src/r0mq_rep_service.erl @@ -1,8 +1,6 @@ --module(r0mq_req). +-module(r0mq_rep_service). -%% The request service. The socket accepts -%% requests and sends responses (i.e., it is connected to by req -%% sockets. +%% A reply service. %% %% See http://wiki.github.com/rabbitmq/rmq-0mq/reqrep diff --git a/src/r0mq_rep.erl b/src/r0mq_req_service.erl similarity index 96% rename from src/r0mq_rep.erl rename to src/r0mq_req_service.erl index 8b37a51..775b0e1 100644 --- a/src/r0mq_rep.erl +++ b/src/r0mq_req_service.erl @@ -1,8 +1,6 @@ --module(r0mq_rep). +-module(r0mq_req_service). -%% The reply service. The socket is sent requests and -%% accepts replies (i.e., it is connected to by rep -%% sockets. +%% A request service. %% %% See http://wiki.github.com/rabbitmq/rmq-0mq/reqrep diff --git a/src/r0mq_pub.erl b/src/r0mq_sub_service.erl similarity index 96% rename from src/r0mq_pub.erl rename to src/r0mq_sub_service.erl index d11b046..d2ca640 100644 --- a/src/r0mq_pub.erl +++ b/src/r0mq_sub_service.erl @@ -1,6 +1,6 @@ --module(r0mq_pub). +-module(r0mq_sub_service). -%% A pub service. +%% A sub service. %% %% See http://wiki.github.com/rabbitmq/rmq-0mq/pubsub diff --git a/src/r0mq_sup.erl b/src/r0mq_sup.erl index 268a363..5868bb4 100644 --- a/src/r0mq_sup.erl +++ b/src/r0mq_sup.erl @@ -60,12 +60,12 @@ child_spec(S = {Rendezvous, Type, Address, Options}) -> end end. -module_for_type(pub) -> r0mq_pub; -module_for_type(sub) -> r0mq_sub; -module_for_type(req) -> r0mq_req; -module_for_type(rep) -> r0mq_rep; -module_for_type(push) -> r0mq_push; -module_for_type(pull) -> r0mq_pull; +module_for_type(pub) -> r0mq_pub_service; +module_for_type(sub) -> r0mq_sub_service; +module_for_type(req) -> r0mq_req_service; +module_for_type(rep) -> r0mq_rep_service; +module_for_type(push) -> r0mq_push_service; +module_for_type(pull) -> r0mq_pull_service; module_for_type(_) -> no_such_type. full_address(Address) when is_list(Address) ->