Permalink
Browse files

subscribe as separate bridge module

  • Loading branch information...
1 parent c4f7135 commit da24b4e6efc935aa5909eefca8e80a1839f80c7f @mabrek committed Feb 26, 2011
Showing with 77 additions and 45 deletions.
  1. +1 −1 include/rabbit_redis.hrl
  2. +40 −0 src/rabbit_redis_subscribe.erl
  3. +20 −0 src/rabbit_redis_util.erl
  4. +16 −44 src/rabbit_redis_worker.erl
View
@@ -1,2 +1,2 @@
-record(worker_state, {redis_connection, rabbit_connection, rabbit_channel,
- set_publish_fields, publish_properties, config}).
+ bridge_module, bridge_state, config}).
@@ -0,0 +1,40 @@
+-module(rabbit_redis_subscribe).
+
+-include(rabbit_redis.hrl).
+
+-record(subscribe_state, {set_publish_fields, publish_properties}).
+
+-export([init/1, handle_info/2]).
+
+init(WorkerState = #worker_state{bridge_module = ?MODULE,
+ redis_connection = RedisConnection,
+ config = Config}) ->
+ % TODO navigate subproperties via rabbit_redis_util
+ [erldis:subscribe(RedisConnection, C, self())
+ || C <- proplists:get_value(channels,
+ proplists:get_value(redis, Config))],
+
+ RabbitConfig = proplists:get_value(rabbit, Config),
+ PublishFields = proplists:get_value(publish_fields, RabbitConfig),
+ SetPublishFields = fun(Method) ->
+ rabbit_redis_util:set_fields(
+ PublishFields,
+ record_info(fields, 'basic.publish'),
+ Method)
+ end,
+ PublishProperties = rabbit_redis_util:set_fields(
+ proplists:get_value(publish_properties, RabbitConfig),
+ record_info(fields, 'P_basic'),
+ #'P_basic'{}),
+ State#worker_state{
+ bridge_state = #subscribe_state{set_publish_fields = SetPublishFields,
+ publish_properties = PublishProperties}}.
+
+handle_info({message, Channel, Payload},
+ State = #worker_state{bridge_state = #subscribe_state{
+ set_publish_fields = SetPublishFields,
+ publish_properties = PublishProperties}}) ->
+ Method = (SetPublishFields)(#'basic.publish'{routing_key = Channel}),
+ Message = #amqp_msg{payload = Payload, props = PublishProperties},
+ amqp_channel:call(State#worker_state.rabbit_channel, Method, Message),
+ {noreply, State};
View
@@ -0,0 +1,20 @@
+-module(rabbit_redis_util).
+
+-export([set_fields]).
+
+set_fields(undefined, _, Record) ->
+ Record;
+
+set_fields(Props, Names, Record) ->
+ {IndexedNames, _Idx} = lists:foldl(
+ fun(Name, {Dict, Idx}) ->
+ {dict:store(Name, Idx, Dict),
+ Idx + 1}
+ end,
+ {dict:new(), 2},
+ Names),
+ lists:foldl(fun({K, V}, R) ->
+ setelement(dict:fetch(K, IndexedNames), R, V)
+ end,
+ Record,
+ proplists:unfold(Props)).
@@ -29,8 +29,6 @@ handle_cast(init, State = #worker_state{config = Config}) ->
{ok, RedisConnection} = erldis_client:start_link(
proplists:get_value(host, RedisConfig),
proplists:get_value(port, RedisConfig)),
- [erldis:subscribe(RedisConnection, C, self())
- || C <- proplists:get_value(channels, RedisConfig)],
RabbitConfig = proplists:get_value(rabbit, Config),
{ok, RabbitConnection} = amqp_connection:start(direct),
@@ -40,37 +38,24 @@ handle_cast(init, State = #worker_state{config = Config}) ->
Method <- resource_declarations(
proplists:get_value(declarations, RabbitConfig))],
- PublishFields = proplists:get_value(publish_fields, RabbitConfig),
- SetPublishFields = fun(Method) ->
- set_fields(PublishFields,
- record_info(fields, 'basic.publish'),
- Method)
- end,
- PublishProperties = set_fields(
- proplists:get_value(publish_properties, RabbitConfig),
- record_info(fields, 'P_basic'),
- #'P_basic'{}),
- {noreply, State#worker_state{redis_connection = RedisConnection,
- rabbit_connection = RabbitConnection,
- rabbit_channel = RabbitChannel,
- set_publish_fields = SetPublishFields,
- publish_properties = PublishProperties}}.
-
-handle_info({message, Channel, Payload}, State) ->
- Method = #'basic.publish'{routing_key = Channel},
- Method1 = (State#worker_state.set_publish_fields)(Method),
- Message = #amqp_msg{payload = Payload,
- props = State#worker_state.publish_properties},
- amqp_channel:call(State#worker_state.rabbit_channel, Method1, Message),
- {noreply, State};
+ BridgeModule = module_for_type(proplists:get_value(type, Config)),
+ State1 = BridgeModule:init(
+ State#worker_state{redis_connection = RedisConnection,
+ rabbit_connection = RabbitConnection,
+ rabbit_channel = RabbitChannel
+ bridge_module = BridgeModule}),
+ {noreply, State1}}.
handle_info({'EXIT', RabbitConnection, Reason},
State = #worker_state{rabbit_connection = RabbitConnection}) ->
{stop, {rabbit_connection_died, Reason}, State};
handle_info({'EXIT', RedisConnection, Reason},
State = #worker_state{redis_connection = RedisConnection}) ->
- {stop, {redis_connection_died, Reason}, State}.
+ {stop, {redis_connection_died, Reason}, State};
+
+handle_info(Message, State = #worker_state{ bridge_module = BridgeModule}) ->
+ BridgeModule:handle_info(Message, State).
terminate(_Reason, State) ->
catch erldis:quit(State#worker_state.redis_connection),
@@ -83,30 +68,17 @@ code_change(_OldVsn, State, _Extra) ->
%% internals
+module_for_type(subscribe) -> rabbit_redis_subscribe.
+
resource_declarations(Declarations) ->
resource_declarations(Declarations, []).
resource_declarations([{Method, Props} | Rest], Acc) ->
Names = rabbit_framing_amqp_0_9_1:method_fieldnames(Method),
Record = rabbit_framing_amqp_0_9_1:method_record(Method),
- resource_declarations(Rest, [set_fields(Props, Names, Record) | Acc]);
+ resource_declarations(
+ Rest,
+ [rabbit_redis_util:set_fields(Props, Names, Record) | Acc]);
resource_declarations([], Acc) ->
Acc.
-
-set_fields(undefined, _, Record) ->
- Record;
-
-set_fields(Props, Names, Record) ->
- {IndexedNames, _Idx} = lists:foldl(
- fun(Name, {Dict, Idx}) ->
- {dict:store(Name, Idx, Dict),
- Idx + 1}
- end,
- {dict:new(), 2},
- Names),
- lists:foldl(fun({K, V}, R) ->
- setelement(dict:fetch(K, IndexedNames), R, V)
- end,
- Record,
- proplists:unfold(Props)).

0 comments on commit da24b4e

Please sign in to comment.