Skip to content

Commit

Permalink
extracts rabbit/redis connection setup/teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
mabrek committed Feb 26, 2011
1 parent 6117435 commit 59f1182
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions test/rabbit_redis_test.erl
Expand Up @@ -8,6 +8,7 @@
-define(REDIS_PORT, 6379).
-define(CHANNEL, <<"channel">>).
-define(EXCHANGE, <<"test_exchange">>).
-define(QUEUE, <<"test_queue">>).
-define(TIMEOUT, 1000).

test() ->
Expand Down Expand Up @@ -36,27 +37,23 @@ redis_only_pubsub() ->
ok.

empty_config() ->
with_application([], fun() -> ok end).
with_configuration([], fun() -> ok end).

subscribe() ->
with_application([[{type, subscribe},
with_configuration([[{type, subscribe},
{redis, [{host, ?REDIS_HOST},
{port, ?REDIS_PORT},
{channels, [?CHANNEL]}
]},
{channels, [?CHANNEL]}]},
{rabbit, [{declarations, [{'exchange.declare',
[{exchange, ?EXCHANGE},
auto_delete
]}]},
{publish_fields, [{exchange, ?EXCHANGE}]}
]}]],
fun subscribe_fun/0).
{publish_fields, [{exchange, ?EXCHANGE}]}]}]],
fun() -> with_rabbit_redis(subscribe_fun/2) end).

subscribe_fun() ->
subscribe_fun(Channel, Redis) ->
pong = gen_server2:call(rabbit_redis_worker, ping), % ensure started

{ok, Rabbit} = amqp_connection:start(direct),
{ok, Channel} = amqp_connection:open_channel(Rabbit),
#'queue.declare_ok'{ queue = Q } =
amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
#'queue.bind_ok'{} =
Expand All @@ -72,28 +69,41 @@ subscribe_fun() ->
after ?TIMEOUT -> throw(timeout)
end,

{ok, Redis} = erldis:connect(?REDIS_HOST, ?REDIS_PORT),
1 = erldis:publish(Redis, ?CHANNEL, <<"payload">>),
erldis:quit(Redis),

receive
{#'basic.deliver'{ consumer_tag = CTag, exchange = ?EXCHANGE,
routing_key = ?CHANNEL},
#amqp_msg{ payload = <<"payload">> }} ->
ok
receive {#'basic.deliver'{ consumer_tag = CTag, exchange = ?EXCHANGE,
routing_key = ?CHANNEL},
#amqp_msg{ payload = <<"payload">> }} -> ok
after ?TIMEOUT -> throw(timeout)
end,

amqp_channel:close(Channel),
amqp_connection:close(Rabbit),
ok.

publish() ->
with_configuration([[{type, publish},
{redis, [{host, ?REDIS_HOST},
{port, ?REDIS_PORT}]},
{rabbit, [{declarations, [{'queue.declare',
[{queue = ?QUEUE},
auto_delete]}]},
{queue, ?QUEUE}]}]],
fun publish_fun/0).

publish_fun() ->
ok.

with_application(Config, Fun) ->
with_configuration(Config, Fun) ->
application:set_env(rabbit_redis, bridges, Config),
ok = application:start(rabbit_redis),
Fun(),
ok = application:stop(rabbit_redis).

with_rabbit_redis(Fun) ->
{ok, Rabbit} = amqp_connection:start(direct),
{ok, Channel} = amqp_connection:open_channel(Rabbit),
{ok, Redis} = erldis:connect(?REDIS_HOST, ?REDIS_PORT),
Fun(Channel, Redis),
erldis:quit(Redis),
amqp_channel:close(Channel),
amqp_connection:close(Rabbit),
ok.

0 comments on commit 59f1182

Please sign in to comment.