Permalink
Browse files

Code for section 5 of the tutorial.

  • Loading branch information...
janimo committed Jul 20, 2011
1 parent 789bb96 commit ba4d5ae97941161a429d2eb62d5d76408704da60
Showing with 64 additions and 0 deletions.
  1. +27 −0 erlang/emit_log_topic.erl
  2. +37 −0 erlang/receive_logs_topic.erl
View
@@ -0,0 +1,27 @@
+#!/usr/bin/env escript
+%%! -pz ./amqp_client ./rabbit_common ./amqp_client/ebin ./rabbit_common/ebin
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+main(Argv) ->
+ {ok, Connection} =
+ amqp_connection:start(#amqp_params_network{host = "localhost"}),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+
+ amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_logs">>,
+ type = <<"topic">>}),
+
+ {RoutingKey, Message} = case Argv of
+ [] -> {<<"anonymous.info">>, <<"Hello World!">>};
+ [R] -> {list_to_binary(R), <<"Hello World!">>};
+ [R | Msg] -> {list_to_binary(R), list_to_binary(string:join(Msg, " "))}
+ end,
+ amqp_channel:cast(Channel,
+ #'basic.publish'{
+ exchange = <<"topic_logs">>,
+ routing_key = RoutingKey},
+ #amqp_msg{payload = Message}),
+ io:format(" [x] Sent ~p:~p~n", [RoutingKey, Message]),
+ ok = amqp_channel:close(Channel),
+ ok = amqp_connection:close(Connection),
+ ok.
@@ -0,0 +1,37 @@
+#!/usr/bin/env escript
+%%! -pz ./amqp_client ./rabbit_common ./amqp_client/ebin ./rabbit_common/ebin
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+main(Argv) ->
+ {ok, Connection} =
+ amqp_connection:start(#amqp_params_network{host = "localhost"}),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+
+ amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_logs">>,
+ type = <<"topic">>}),
+
+ #'queue.declare_ok'{queue = Queue} =
+ amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
+
+ lists:foreach(fun(R) ->
+ amqp_channel:call(Channel, #'queue.bind'{exchange = <<"topic_logs">>,
+ routing_key = list_to_binary(R),
+ queue = Queue})
+ end, Argv),
+
+ io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),
+
+ amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
+ no_ack = true}, self()),
+ receive
+ #'basic.consume_ok'{} -> ok
+ end,
+ loop(Channel).
+
+loop(Channel) ->
+ receive
+ {#'basic.deliver'{routing_key=RoutingKey}, #amqp_msg{payload = Body}} ->
+ io:format(" [x] ~p:~p~n", [RoutingKey, Body]),
+ loop(Channel)
+ end.

0 comments on commit ba4d5ae

Please sign in to comment.