Skip to content
Browse files

Merge remote-tracking branch 'remotes/janimo/erlang'

  • Loading branch information...
2 parents 05a8c3d + ba47e9a commit cb0f9a5b00965619255ff2c790212eda2fcdd68d @majek majek committed Jul 20, 2011
Showing with 138 additions and 0 deletions.
  1. +10 −0 erlang/README.md
  2. +27 −0 erlang/emit_log_direct.erl
  3. +27 −0 erlang/emit_log_topic.erl
  4. +37 −0 erlang/receive_logs_direct.erl
  5. +37 −0 erlang/receive_logs_topic.erl
View
10 erlang/README.md
@@ -42,3 +42,13 @@ You need Erlang Client binaries:
./receive_logs.erl
./emit_log.erl "info: This is the log message"
+
+[Tutorial four: Routing](http://www.rabbitmq.com/tutorial-four-python.html):
+
+ ./receive_logs_direct.erl info
+ ./emit_log_direct.py info Hello
+
+[Tutorial five: Topics](http://www.rabbitmq.com/tutorial-five-python.html):
+
+ ./receive_logs_topic.erl *.rabbit
+ ./emit_log_topic.erl red.rabbit Hello
View
27 erlang/emit_log_direct.erl
@@ -0,0 +1,27 @@
+#!/usr/bin/env escript
+%%! -pz ./amqp_client ./rabbit_common ./amqp_client/ebin ./rabbit_common/ebin
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+main(Argv) ->
+ {ok, Connection} =
+ amqp_connection:start(#amqp_params_network{host = "localhost"}),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+
+ amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"direct_logs">>,
+ type = <<"direct">>}),
+
+ {Severity, Message} = case Argv of
+ [] -> {<<"info">>, <<"Hello World!">>};
+ [S] -> {list_to_binary(S), <<"Hello World!">>};
+ [S | Msg] -> {list_to_binary(S), list_to_binary(string:join(Msg, " "))}
+ end,
+ amqp_channel:cast(Channel,
+ #'basic.publish'{
+ exchange = <<"direct_logs">>,
+ routing_key = Severity},
+ #amqp_msg{payload = Message}),
+ io:format(" [x] Sent ~p:~p~n", [Severity, Message]),
+ ok = amqp_channel:close(Channel),
+ ok = amqp_connection:close(Connection),
+ ok.
View
27 erlang/emit_log_topic.erl
@@ -0,0 +1,27 @@
+#!/usr/bin/env escript
+%%! -pz ./amqp_client ./rabbit_common ./amqp_client/ebin ./rabbit_common/ebin
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+main(Argv) ->
+ {ok, Connection} =
+ amqp_connection:start(#amqp_params_network{host = "localhost"}),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+
+ amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_logs">>,
+ type = <<"topic">>}),
+
+ {RoutingKey, Message} = case Argv of
+ [] -> {<<"anonymous.info">>, <<"Hello World!">>};
+ [R] -> {list_to_binary(R), <<"Hello World!">>};
+ [R | Msg] -> {list_to_binary(R), list_to_binary(string:join(Msg, " "))}
+ end,
+ amqp_channel:cast(Channel,
+ #'basic.publish'{
+ exchange = <<"topic_logs">>,
+ routing_key = RoutingKey},
+ #amqp_msg{payload = Message}),
+ io:format(" [x] Sent ~p:~p~n", [RoutingKey, Message]),
+ ok = amqp_channel:close(Channel),
+ ok = amqp_connection:close(Connection),
+ ok.
View
37 erlang/receive_logs_direct.erl
@@ -0,0 +1,37 @@
+#!/usr/bin/env escript
+%%! -pz ./amqp_client ./rabbit_common ./amqp_client/ebin ./rabbit_common/ebin
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+main(Argv) ->
+ {ok, Connection} =
+ amqp_connection:start(#amqp_params_network{host = "localhost"}),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+
+ amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"direct_logs">>,
+ type = <<"direct">>}),
+
+ #'queue.declare_ok'{queue = Queue} =
+ amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
+
+ lists:foreach(fun(S) ->
+ amqp_channel:call(Channel, #'queue.bind'{exchange = <<"direct_logs">>,
+ routing_key = list_to_binary(S),
+ queue = Queue})
+ end, Argv),
+
+ io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),
+
+ amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
+ no_ack = true}, self()),
+ receive
+ #'basic.consume_ok'{} -> ok
+ end,
+ loop(Channel).
+
+loop(Channel) ->
+ receive
+ {#'basic.deliver'{routing_key=RoutingKey}, #amqp_msg{payload = Body}} ->
+ io:format(" [x] ~p:~p~n", [RoutingKey, Body]),
+ loop(Channel)
+ end.
View
37 erlang/receive_logs_topic.erl
@@ -0,0 +1,37 @@
+#!/usr/bin/env escript
+%%! -pz ./amqp_client ./rabbit_common ./amqp_client/ebin ./rabbit_common/ebin
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+main(Argv) ->
+ {ok, Connection} =
+ amqp_connection:start(#amqp_params_network{host = "localhost"}),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+
+ amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_logs">>,
+ type = <<"topic">>}),
+
+ #'queue.declare_ok'{queue = Queue} =
+ amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
+
+ lists:foreach(fun(R) ->
+ amqp_channel:call(Channel, #'queue.bind'{exchange = <<"topic_logs">>,
+ routing_key = list_to_binary(R),
+ queue = Queue})
+ end, Argv),
+
+ io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),
+
+ amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
+ no_ack = true}, self()),
+ receive
+ #'basic.consume_ok'{} -> ok
+ end,
+ loop(Channel).
+
+loop(Channel) ->
+ receive
+ {#'basic.deliver'{routing_key=RoutingKey}, #amqp_msg{payload = Body}} ->
+ io:format(" [x] ~p:~p~n", [RoutingKey, Body]),
+ loop(Channel)
+ end.

0 comments on commit cb0f9a5

Please sign in to comment.
Something went wrong with that request. Please try again.