Permalink
Browse files

Major chunk of the subscription implementation.

  • Loading branch information...
tonyg committed Jun 4, 2009
1 parent 0204de7 commit dfd9cba5d41d55bb4b4d4f9d38414139a4fc2d28
View
@@ -0,0 +1,163 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-record(user, {username, password}).
+-record(permission, {configure, write, read}).
+-record(user_vhost, {username, virtual_host}).
+-record(user_permission, {user_vhost, permission}).
+
+-record(vhost, {virtual_host, dummy}).
+
+-record(connection, {user, timeout_sec, frame_max, vhost}).
+
+-record(content,
+ {class_id,
+ properties, %% either 'none', or a decoded record/tuple
+ properties_bin, %% either 'none', or an encoded properties binary
+ %% Note: at most one of properties and properties_bin can be
+ %% 'none' at once.
+ payload_fragments_rev %% list of binaries, in reverse order (!)
+ }).
+
+-record(resource, {virtual_host, kind, name}).
+
+-record(exchange, {name, type, durable, auto_delete, arguments}).
+
+-record(amqqueue, {name, durable, auto_delete, arguments, pid}).
+
+%% mnesia doesn't like unary records, so we add a dummy 'value' field
+-record(route, {binding, value = const}).
+-record(reverse_route, {reverse_binding, value = const}).
+
+-record(binding, {exchange_name, key, queue_name, args = []}).
+-record(reverse_binding, {queue_name, key, exchange_name, args = []}).
+
+-record(listener, {node, protocol, host, port}).
+
+-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-include("rabbit_framing_spec.hrl").
+
+-type(maybe(T) :: T | 'none').
+-type(erlang_node() :: atom()).
+-type(socket() :: port()).
+-type(thunk(T) :: fun(() -> T)).
+-type(info_key() :: atom()).
+-type(info() :: {info_key(), any()}).
+-type(regexp() :: binary()).
+
+%% this is really an abstract type, but dialyzer does not support them
+-type(guid() :: any()).
+-type(txn() :: guid()).
+-type(pkey() :: guid()).
+-type(r(Kind) ::
+ #resource{virtual_host :: vhost(),
+ kind :: Kind,
+ name :: resource_name()}).
+-type(queue_name() :: r('queue')).
+-type(exchange_name() :: r('exchange')).
+-type(user() ::
+ #user{username :: username(),
+ password :: password()}).
+-type(permission() ::
+ #permission{configure :: regexp(),
+ write :: regexp(),
+ read :: regexp()}).
+-type(amqqueue() ::
+ #amqqueue{name :: queue_name(),
+ durable :: bool(),
+ auto_delete :: bool(),
+ arguments :: amqp_table(),
+ pid :: maybe(pid())}).
+-type(exchange() ::
+ #exchange{name :: exchange_name(),
+ type :: exchange_type(),
+ durable :: bool(),
+ auto_delete :: bool(),
+ arguments :: amqp_table()}).
+-type(binding() ::
+ #binding{exchange_name :: exchange_name(),
+ queue_name :: queue_name(),
+ key :: binding_key()}).
+%% TODO: make this more precise by tying specific class_ids to
+%% specific properties
+-type(undecoded_content() ::
+ #content{class_id :: amqp_class_id(),
+ properties :: 'none',
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]} |
+ #content{class_id :: amqp_class_id(),
+ properties :: amqp_properties(),
+ properties_bin :: 'none',
+ payload_fragments_rev :: [binary()]}).
+-type(decoded_content() ::
+ #content{class_id :: amqp_class_id(),
+ properties :: amqp_properties(),
+ properties_bin :: maybe(binary()),
+ payload_fragments_rev :: [binary()]}).
+-type(content() :: undecoded_content() | decoded_content()).
+-type(basic_message() ::
+ #basic_message{exchange_name :: exchange_name(),
+ routing_key :: routing_key(),
+ content :: content(),
+ persistent_key :: maybe(pkey())}).
+-type(message() :: basic_message()).
+%% this really should be an abstract type
+-type(msg_id() :: non_neg_integer()).
+-type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}).
+-type(listener() ::
+ #listener{node :: erlang_node(),
+ protocol :: atom(),
+ host :: string() | atom(),
+ port :: non_neg_integer()}).
+-type(not_found() :: {'error', 'not_found'}).
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
+-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+
+-ifdef(debug).
+-define(LOGDEBUG0(F), rabbit_log:debug(F)).
+-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
+-define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
+-else.
+-define(LOGDEBUG0(F), ok).
+-define(LOGDEBUG(F,A), ok).
+-define(LOGMESSAGE(D,C,M,Co), ok).
+-endif.
View
@@ -1,4 +1,3 @@
--record(rabbithub_subscription, {queue, callback}).
+-record(rabbithub_subscription, {resource, topic, callback}).
-%% RabbitMQ resource
--record(resource, {virtual_host, kind, name}).
+-record(rabbithub_subscription_pid, {subscription, pid}).
View
@@ -36,7 +36,13 @@ setup_schema() ->
ok = mnesia:start(),
ok = create_table(rabbithub_subscription,
[{attributes, record_info(fields, rabbithub_subscription)},
+ {type, bag},
{disc_copies, [node()]}]),
+ ok = create_table(rabbithub_subscription_pid,
+ [{attributes, record_info(fields, rabbithub_subscription_pid)}]),
+ ok = mnesia:wait_for_tables([rabbithub_subscription,
+ rabbithub_subscription_pid],
+ 5000),
ok.
create_table(Name, Params) ->
View
@@ -0,0 +1,100 @@
+-module(rabbithub_consumer).
+
+-behaviour(gen_server).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-include("rabbithub.hrl").
+-include("rabbit.hrl").
+
+-record(state, {subscription, q_monitor_ref, consumer_tag}).
+
+init([Subscription]) ->
+ error_logger:info_report({starting_consumer, Subscription}),
+
+ process_flag(trap_exit, true),
+ case rabbithub_subscription:register_subscription_pid(Subscription, self()) of
+ ok ->
+ really_init(Subscription);
+ duplicate ->
+ {stop, normal}
+ end.
+
+really_init(Subscription = #rabbithub_subscription{resource = Resource}) ->
+ case rabbithub:rabbit_call(rabbit_amqqueue, lookup, [Resource]) of
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ ConsumerTag = rabbithub:binstring_guid("amq.http.consumer"),
+ MonRef = erlang:monitor(process, QPid),
+ rabbithub:rabbit_call(rabbit_amqqueue, basic_consume,
+ [Q, false, self(), self(), undefined,
+ ConsumerTag, false, undefined]),
+ {ok, #state{subscription = Subscription,
+ q_monitor_ref = MonRef,
+ consumer_tag = ConsumerTag}};
+ {error, not_found} ->
+ {stop, queue_not_found}
+ end.
+
+error_and_delete_self(ErrorReport, State = #state{subscription = Subscription}) ->
+ error_logger:error_report(ErrorReport),
+ rabbithub_subscription:delete(Subscription),
+ {noreply, State}.
+
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast({deliver, _ConsumerTag, AckRequired,
+ {#resource{name = _QNameBin}, QPid, MsgId, Redelivered,
+ #basic_message{exchange_name = #resource{name = _ExchangeNameBin},
+ routing_key = RoutingKeyBin,
+ content = #content{payload_fragments_rev = PayloadRev}}}},
+ State = #state{subscription = #rabbithub_subscription{topic = TopicStr,
+ callback = CallbackStr}}) ->
+ ExtraQuery =
+ lists:flatten(io_lib:format("hub.topic=~s", [TopicStr])),
+ %% FIXME: get content properties out in some clean way
+ PayloadBin = list_to_binary(lists:reverse(PayloadRev)),
+ case simple_httpc:req("POST",
+ CallbackStr,
+ ExtraQuery,
+ [{"Content-length", integer_to_list(size(PayloadBin))},
+ {"X-AMQP-Routing-Key", RoutingKeyBin},
+ {"X-AMQP-Redelivered", atom_to_list(Redelivered)}],
+ PayloadBin) of
+ {ok, StatusCode, _StatusText, _Headers, _Body} ->
+ if
+ StatusCode >= 200 andalso StatusCode < 300 ->
+ ok = rabbithub:rabbit_call(rabbit_amqqueue, notify_sent, [QPid, self()]),
+ case AckRequired of
+ true ->
+ ok = rabbithub:rabbit_call(rabbit_amqqueue, ack,
+ [QPid, none, [MsgId], self()]);
+ false ->
+ ok
+ end,
+ {noreply, State};
+ true ->
+ error_and_delete_self({rabbithub_consumer,
+ http_post_unexpected_status,
+ StatusCode}, State)
+ end;
+ {error, Reason} ->
+ error_and_delete_self({rabbithub_consumer,
+ http_post_failure,
+ Reason}, State)
+ end;
+handle_cast(shutdown, State) ->
+ {stop, normal, State};
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(Request, State) ->
+ {stop, {unhandled_info, Request}, State}.
+
+terminate(_Reason, _State = #state{subscription = Subscription}) ->
+ error_logger:info_report({stopping_consumer, _Reason, _State}),
+ ok = rabbithub_subscription:erase_subscription_pid(Subscription),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
@@ -0,0 +1,110 @@
+-module(rabbithub_pseudo_queue).
+
+-behaviour(gen_server).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-include("rabbithub.hrl").
+
+-record(state, {subscription, rabbit_monitor_ref, queue_name}).
+
+init([Subscription]) ->
+ error_logger:info_report({starting_pseudo_queue, Subscription}),
+
+ process_flag(trap_exit, true),
+ case rabbithub_subscription:register_subscription_pid(Subscription, self()) of
+ ok ->
+ really_init(Subscription);
+ duplicate ->
+ {stop, normal}
+ end.
+
+really_init(Subscription = #rabbithub_subscription{resource = Resource,
+ topic = Topic}) ->
+ QueueName = rabbithub:r(queue, rabbithub:binstring_guid("amq.http.pseudoqueue")),
+ Q = rabbithub:rabbit_call(rabbit_amqqueue, pseudo_queue,
+ [QueueName, self()]),
+ Q = rabbithub:rabbit_call(rabbit_amqqueue, internal_declare,
+ [Q, false]),
+ case rabbithub:rabbit_call(rabbit_exchange, add_binding,
+ [Resource, QueueName, list_to_binary(Topic), []]) of
+ ok ->
+ RabbitPid = rabbithub:rabbit_call(erlang, whereis, [rabbit_sup]),
+ MonRef = erlang:monitor(process, RabbitPid),
+ {ok, #state{subscription = Subscription,
+ rabbit_monitor_ref = MonRef,
+ queue_name = QueueName}};
+ {error, exchange_not_found} ->
+ {stop, exchange_not_found}
+ end.
+
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(Request, State) ->
+ {stop, {unhandled_info, Request}, State}.
+
+terminate(_Reason, _State = #state{subscription = Subscription,
+ queue_name = QueueName}) ->
+ error_logger:info_report({stopping_pseudo_queue, _Reason, _State}),
+ ok = rabbithub_subscription:erase_subscription_pid(Subscription),
+ rabbithub:rabbit_call(rabbit_amqqueue, internal_delete, [QueueName]),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% receive
+%% {unavailable, JID, RKBin, AllResources} ->
+%% {atomic, NewState} =
+%% mnesia:transaction(
+%% fun () ->
+%% NewPriorities =
+%% case AllResources of
+%% true ->
+%% [E || E = {_, {J, _}} <- Priorities,
+%% not jids_equal_upto_resource(J, JID)];
+%% false ->
+%% lists:keydelete({JID, RKBin}, 2, Priorities)
+%% end,
+%% case NewPriorities of
+%% [] ->
+%% mnesia:delete({rabbitmq_consumer_process,
+%% State#consumer_state.queue}),
+%% terminate;
+%% _ ->
+%% State#consumer_state{priorities = NewPriorities}
+%% end
+%% end),
+%% case NewState of
+%% terminate ->
+%% ?INFO_MSG("**** terminating consumer~n~p", [State#consumer_state.queue]),
+%% consumer_done(State#consumer_state{priorities = []}),
+%% done;
+%% _ ->
+%% ?MODULE:consumer_main(NewState)
+%% end;
+%% {presence, JID, RKBin, Priority} ->
+%% NewPriorities = lists:keysort(1, keystore({JID, RKBin}, 2, Priorities,
+%% {-Priority, {JID, RKBin}})),
+%% ?MODULE:consumer_main(State#consumer_state{priorities = NewPriorities});
+%% {'$gen_cast', {deliver, _ConsumerTag, false, {_QName, QPid, _Id, _Redelivered, Msg}}} ->
+%% #basic_message{exchange_name = #resource{name = XNameBin},
+%% routing_key = RKBin,
+%% content = #content{payload_fragments_rev = PayloadRev}} = Msg,
+%% [{_, {TopPriorityJID, _}} | _] = Priorities,
+%% send_message(jlib:make_jid(binary_to_list(XNameBin),
+%% State#consumer_state.lserver,
+%% binary_to_list(RKBin)),
+%% TopPriorityJID,
+%% "chat",
+%% binary_to_list(list_to_binary(lists:reverse(PayloadRev)))),
+%% rabbit_amqqueue:notify_sent(QPid, self()),
+%% ?MODULE:consumer_main(State);
+%% Other ->
+%% ?INFO_MSG("Consumer main ~p got~n~p", [State#consumer_state.queue, Other]),
+%% ?MODULE:consumer_main(State)
+%% end
Oops, something went wrong.

0 comments on commit dfd9cba

Please sign in to comment.