Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

made from rabbitmq-erlang-client rabbitmq_v2_1_0

  • Loading branch information...
commit c3e4b23aa87252590970abcb89b0bf7117ae57f0 0 parents
David Reid authored
2  .gitignore
@@ -0,0 +1,2 @@
+deps
+ebin
98 include/rabbit.hrl
@@ -0,0 +1,98 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-record(user, {username, password, is_admin}).
+-record(permission, {scope, configure, write, read}).
+-record(user_vhost, {username, virtual_host}).
+-record(user_permission, {user_vhost, permission}).
+
+-record(vhost, {virtual_host, dummy}).
+
+-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
+ client_properties}).
+
+-record(content,
+ {class_id,
+ properties, %% either 'none', or a decoded record/tuple
+ properties_bin, %% either 'none', or an encoded properties binary
+ %% Note: at most one of properties and properties_bin can be
+ %% 'none' at once.
+ protocol, %% The protocol under which properties_bin was encoded
+ payload_fragments_rev %% list of binaries, in reverse order (!)
+ }).
+
+-record(resource, {virtual_host, kind, name}).
+
+-record(exchange, {name, type, durable, auto_delete, arguments}).
+
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
+ arguments, pid}).
+
+%% mnesia doesn't like unary records, so we add a dummy 'value' field
+-record(route, {binding, value = const}).
+-record(reverse_route, {reverse_binding, value = const}).
+
+-record(binding, {exchange_name, key, queue_name, args = []}).
+-record(reverse_binding, {queue_name, key, exchange_name, args = []}).
+
+-record(listener, {node, protocol, host, port}).
+
+-record(basic_message, {exchange_name, routing_key, content, guid,
+ is_persistent}).
+
+-record(ssl_socket, {tcp, ssl}).
+-record(delivery, {mandatory, immediate, txn, sender, message}).
+-record(amqp_error, {name, explanation, method = none}).
+
+-record(event, {type, props, timestamp}).
+
+%%----------------------------------------------------------------------------
+
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
+-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
+-define(ERTS_MINIMUM, "5.6.3").
+
+-define(MAX_WAIT, 16#ffffffff).
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+-define(STATS_INTERVAL, 5000).
+
+-ifdef(debug).
+-define(LOGDEBUG0(F), rabbit_log:debug(F)).
+-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
+-define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
+-else.
+-define(LOGDEBUG0(F), ok).
+-define(LOGDEBUG(F,A), ok).
+-define(LOGMESSAGE(D,C,M,Co), ok).
+-endif.
64 include/rabbit_backing_queue_spec.hrl
@@ -0,0 +1,64 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-type(fetch_result() ::
+ %% Message, IsDelivered, AckTag, Remaining_Len
+ ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
+-type(is_durable() :: boolean()).
+-type(attempt_recovery() :: boolean()).
+-type(purged_msg_count() :: non_neg_integer()).
+-type(ack_required() :: boolean()).
+
+-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
+-spec(stop/0 :: () -> 'ok').
+-spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> state()).
+-spec(terminate/1 :: (state()) -> state()).
+-spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
+-spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()).
+-spec(publish_delivered/3 ::
+ (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}).
+-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
+-spec(ack/2 :: ([ack()], state()) -> state()).
+-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()).
+-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
+-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
+-spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> {[ack()], state()}).
+-spec(requeue/2 :: ([ack()], state()) -> state()).
+-spec(len/1 :: (state()) -> non_neg_integer()).
+-spec(is_empty/1 :: (state()) -> boolean()).
+-spec(set_ram_duration_target/2 ::
+ (('undefined' | 'infinity' | number()), state()) -> state()).
+-spec(ram_duration/1 :: (state()) -> {number(), state()}).
+-spec(needs_idle_timeout/1 :: (state()) -> boolean()).
+-spec(idle_timeout/1 :: (state()) -> state()).
+-spec(handle_pre_hibernate/1 :: (state()) -> state()).
+-spec(status/1 :: (state()) -> [{atom(), any()}]).
50 include/rabbit_exchange_type_spec.hrl
@@ -0,0 +1,50 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> {rabbit_router:routing_result(), [pid()]}).
+-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
+-spec(create/1 :: (rabbit_types:exchange()) -> 'ok').
+-spec(recover/2 :: (rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok').
+-spec(delete/2 :: (rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok').
+-spec(add_binding/2 :: (rabbit_types:exchange(),
+ rabbit_types:binding()) -> 'ok').
+-spec(remove_bindings/2 :: (rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok').
+-spec(assert_args_equivalence/2 ::
+ (rabbit_types:exchange(), rabbit_framing:amqp_table())
+ -> 'ok' | rabbit_types:connection_exit()).
+
+-endif.
170 include/rabbit_framing.hrl
@@ -0,0 +1,170 @@
+%% Autogenerated code. Do not edit.
+%%
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-define(PROTOCOL_PORT, 5672).
+-define(FRAME_METHOD, 1).
+-define(FRAME_HEADER, 2).
+-define(FRAME_BODY, 3).
+-define(FRAME_HEARTBEAT, 8).
+-define(FRAME_MIN_SIZE, 4096).
+-define(FRAME_END, 206).
+-define(REPLY_SUCCESS, 200).
+-define(CONTENT_TOO_LARGE, 311).
+-define(NO_ROUTE, 312).
+-define(NO_CONSUMERS, 313).
+-define(ACCESS_REFUSED, 403).
+-define(NOT_FOUND, 404).
+-define(RESOURCE_LOCKED, 405).
+-define(PRECONDITION_FAILED, 406).
+-define(CONNECTION_FORCED, 320).
+-define(INVALID_PATH, 402).
+-define(FRAME_ERROR, 501).
+-define(SYNTAX_ERROR, 502).
+-define(COMMAND_INVALID, 503).
+-define(CHANNEL_ERROR, 504).
+-define(UNEXPECTED_FRAME, 505).
+-define(RESOURCE_ERROR, 506).
+-define(NOT_ALLOWED, 530).
+-define(NOT_IMPLEMENTED, 540).
+-define(INTERNAL_ERROR, 541).
+-define(FRAME_OOB_METHOD, 4).
+-define(FRAME_OOB_HEADER, 5).
+-define(FRAME_OOB_BODY, 6).
+-define(FRAME_TRACE, 7).
+-define(NOT_DELIVERED, 310).
+%% Method field records.
+-record('connection.start', {version_major = 0, version_minor = 9, server_properties, mechanisms = <<"PLAIN">>, locales = <<"en_US">>}).
+-record('connection.start_ok', {client_properties, mechanism = <<"PLAIN">>, response, locale = <<"en_US">>}).
+-record('connection.secure', {challenge}).
+-record('connection.secure_ok', {response}).
+-record('connection.tune', {channel_max = 0, frame_max = 0, heartbeat = 0}).
+-record('connection.tune_ok', {channel_max = 0, frame_max = 0, heartbeat = 0}).
+-record('connection.open', {virtual_host = <<"/">>, capabilities = <<"">>, insist = false}).
+-record('connection.open_ok', {known_hosts = <<"">>}).
+-record('connection.close', {reply_code, reply_text = <<"">>, class_id, method_id}).
+-record('connection.close_ok', {}).
+-record('connection.redirect', {host, known_hosts = <<"">>}).
+-record('channel.open', {out_of_band = <<"">>}).
+-record('channel.open_ok', {channel_id = <<"">>}).
+-record('channel.flow', {active}).
+-record('channel.flow_ok', {active}).
+-record('channel.close', {reply_code, reply_text = <<"">>, class_id, method_id}).
+-record('channel.close_ok', {}).
+-record('channel.alert', {reply_code, reply_text = <<"">>, details = []}).
+-record('access.request', {realm = <<"/data">>, exclusive = false, passive = true, active = true, write = true, read = true}).
+-record('access.request_ok', {ticket = 1}).
+-record('exchange.declare', {ticket = 0, exchange, type = <<"direct">>, passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = []}).
+-record('exchange.declare_ok', {}).
+-record('exchange.delete', {ticket = 0, exchange, if_unused = false, nowait = false}).
+-record('exchange.delete_ok', {}).
+-record('queue.declare', {ticket = 0, queue = <<"">>, passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = []}).
+-record('queue.declare_ok', {queue, message_count, consumer_count}).
+-record('queue.bind', {ticket = 0, queue, exchange, routing_key = <<"">>, nowait = false, arguments = []}).
+-record('queue.bind_ok', {}).
+-record('queue.purge', {ticket = 0, queue, nowait = false}).
+-record('queue.purge_ok', {message_count}).
+-record('queue.delete', {ticket = 0, queue, if_unused = false, if_empty = false, nowait = false}).
+-record('queue.delete_ok', {message_count}).
+-record('queue.unbind', {ticket = 0, queue, exchange, routing_key = <<"">>, arguments = []}).
+-record('queue.unbind_ok', {}).
+-record('basic.qos', {prefetch_size = 0, prefetch_count = 0, global = false}).
+-record('basic.qos_ok', {}).
+-record('basic.consume', {ticket = 0, queue, consumer_tag = <<"">>, no_local = false, no_ack = false, exclusive = false, nowait = false, arguments = []}).
+-record('basic.consume_ok', {consumer_tag}).
+-record('basic.cancel', {consumer_tag, nowait = false}).
+-record('basic.cancel_ok', {consumer_tag}).
+-record('basic.publish', {ticket = 0, exchange = <<"">>, routing_key = <<"">>, mandatory = false, immediate = false}).
+-record('basic.return', {reply_code, reply_text = <<"">>, exchange, routing_key}).
+-record('basic.deliver', {consumer_tag, delivery_tag, redelivered = false, exchange, routing_key}).
+-record('basic.get', {ticket = 0, queue, no_ack = false}).
+-record('basic.get_ok', {delivery_tag, redelivered = false, exchange, routing_key, message_count}).
+-record('basic.get_empty', {cluster_id = <<"">>}).
+-record('basic.ack', {delivery_tag = 0, multiple = false}).
+-record('basic.reject', {delivery_tag, requeue = true}).
+-record('basic.recover_async', {requeue = false}).
+-record('basic.recover', {requeue = false}).
+-record('basic.recover_ok', {}).
+-record('tx.select', {}).
+-record('tx.select_ok', {}).
+-record('tx.commit', {}).
+-record('tx.commit_ok', {}).
+-record('tx.rollback', {}).
+-record('tx.rollback_ok', {}).
+-record('file.qos', {prefetch_size = 0, prefetch_count = 0, global = false}).
+-record('file.qos_ok', {}).
+-record('file.consume', {ticket = 1, queue, consumer_tag = <<"">>, no_local = false, no_ack = false, exclusive = false, nowait = false}).
+-record('file.consume_ok', {consumer_tag}).
+-record('file.cancel', {consumer_tag, nowait = false}).
+-record('file.cancel_ok', {consumer_tag}).
+-record('file.open', {identifier, content_size}).
+-record('file.open_ok', {staged_size}).
+-record('file.stage', {}).
+-record('file.publish', {ticket = 1, exchange = <<"">>, routing_key = <<"">>, mandatory = false, immediate = false, identifier}).
+-record('file.return', {reply_code = 200, reply_text = <<"">>, exchange, routing_key}).
+-record('file.deliver', {consumer_tag, delivery_tag, redelivered = false, exchange, routing_key, identifier}).
+-record('file.ack', {delivery_tag = 0, multiple = false}).
+-record('file.reject', {delivery_tag, requeue = true}).
+-record('stream.qos', {prefetch_size = 0, prefetch_count = 0, consume_rate = 0, global = false}).
+-record('stream.qos_ok', {}).
+-record('stream.consume', {ticket = 1, queue, consumer_tag = <<"">>, no_local = false, exclusive = false, nowait = false}).
+-record('stream.consume_ok', {consumer_tag}).
+-record('stream.cancel', {consumer_tag, nowait = false}).
+-record('stream.cancel_ok', {consumer_tag}).
+-record('stream.publish', {ticket = 1, exchange = <<"">>, routing_key = <<"">>, mandatory = false, immediate = false}).
+-record('stream.return', {reply_code = 200, reply_text = <<"">>, exchange, routing_key}).
+-record('stream.deliver', {consumer_tag, delivery_tag, exchange, queue}).
+-record('dtx.select', {}).
+-record('dtx.select_ok', {}).
+-record('dtx.start', {dtx_identifier}).
+-record('dtx.start_ok', {}).
+-record('tunnel.request', {meta_data}).
+-record('test.integer', {integer_1, integer_2, integer_3, integer_4, operation}).
+-record('test.integer_ok', {result}).
+-record('test.string', {string_1, string_2, operation}).
+-record('test.string_ok', {result}).
+-record('test.table', {table, integer_op, string_op}).
+-record('test.table_ok', {integer_result, string_result}).
+-record('test.content', {}).
+-record('test.content_ok', {content_checksum}).
+%% Class property records.
+-record('P_connection', {}).
+-record('P_channel', {}).
+-record('P_access', {}).
+-record('P_exchange', {}).
+-record('P_queue', {}).
+-record('P_basic', {content_type, content_encoding, headers, delivery_mode, priority, correlation_id, reply_to, expiration, message_id, timestamp, type, user_id, app_id, cluster_id}).
+-record('P_tx', {}).
+-record('P_file', {content_type, content_encoding, headers, priority, reply_to, message_id, filename, timestamp, cluster_id}).
+-record('P_stream', {content_type, content_encoding, headers, priority, timestamp}).
+-record('P_dtx', {}).
+-record('P_tunnel', {headers, proxy_name, data_name, durable, broadcast}).
+-record('P_test', {}).
41 include/rabbit_msg_store.hrl
@@ -0,0 +1,41 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-include("rabbit.hrl").
+
+-ifdef(use_specs).
+
+-type(msg() :: any()).
+
+-endif.
+
+-record(msg_location,
+ {guid, ref_count, file, offset, total_size}).
59 include/rabbit_msg_store_index.hrl
@@ -0,0 +1,59 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-include("rabbit_msg_store.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(dir() :: any()).
+-type(index_state() :: any()).
+-type(keyvalue() :: any()).
+-type(fieldpos() :: non_neg_integer()).
+-type(fieldvalue() :: any()).
+
+-spec(new/1 :: (dir()) -> index_state()).
+-spec(recover/1 :: (dir()) -> rabbit_types:ok_or_error2(index_state(), any())).
+-spec(lookup/2 ::
+ (rabbit_guid:guid(), index_state()) -> ('not_found' | keyvalue())).
+-spec(insert/2 :: (keyvalue(), index_state()) -> 'ok').
+-spec(update/2 :: (keyvalue(), index_state()) -> 'ok').
+-spec(update_fields/3 :: (rabbit_guid:guid(), ({fieldpos(), fieldvalue()} |
+ [{fieldpos(), fieldvalue()}]),
+ index_state()) -> 'ok').
+-spec(delete/2 :: (rabbit_guid:guid(), index_state()) -> 'ok').
+-spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok').
+-spec(terminate/1 :: (index_state()) -> any()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
1,139 src/gen_server2.erl
@@ -0,0 +1,1139 @@
+%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) the module name is gen_server2
+%%
+%% 2) more efficient handling of selective receives in callbacks
+%% gen_server2 processes drain their message queue into an internal
+%% buffer before invoking any callback module functions. Messages are
+%% dequeued from the buffer for processing. Thus the effective message
+%% queue of a gen_server2 process is the concatenation of the internal
+%% buffer and the real message queue.
+%% As a result of the draining, any selective receive invoked inside a
+%% callback is less likely to have to scan a large message queue.
+%%
+%% 3) gen_server2:cast is guaranteed to be order-preserving
+%% The original code could reorder messages when communicating with a
+%% process on a remote node that was not currently connected.
+%%
+%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
+%% allow callers to attach priorities to requests. Requests with
+%% higher priorities are processed before requests with lower
+%% priorities. The default priority is 0.
+%%
+%% 5) The callback module can optionally implement
+%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
+%% called immediately prior to and post hibernation, respectively. If
+%% handle_pre_hibernate returns {hibernate, NewState} then the process
+%% will hibernate. If the module does not implement
+%% handle_pre_hibernate/1 then the default action is to hibernate.
+%%
+%% 6) init can return a 4th arg, {backoff, InitialTimeout,
+%% MinimumTimeout, DesiredHibernatePeriod} (all in
+%% milliseconds). Then, on all callbacks which can return a timeout
+%% (including init), timeout can be 'hibernate'. When this is the
+%% case, the current timeout value will be used (initially, the
+%% InitialTimeout supplied from init). After this timeout has
+%% occurred, hibernation will occur as normal. Upon awaking, a new
+%% current timeout value will be calculated.
+%%
+%% The purpose is that the gen_server2 takes care of adjusting the
+%% current timeout value such that the process will increase the
+%% timeout value repeatedly if it is unable to sleep for the
+%% DesiredHibernatePeriod. If it is able to sleep for the
+%% DesiredHibernatePeriod it will decrease the current timeout down to
+%% the MinimumTimeout, so that the process is put to sleep sooner (and
+%% hopefully stays asleep for longer). In short, should a process
+%% using this receive a burst of messages, it should not hibernate
+%% between those messages, but as the messages become less frequent,
+%% the process will not only hibernate, it will do so sooner after
+%% each message.
+%%
+%% When using this backoff mechanism, normal timeout values (i.e. not
+%% 'hibernate') can still be used, and if they are used then the
+%% handle_info(timeout, State) will be called as normal. In this case,
+%% returning 'hibernate' from handle_info(timeout, State) will not
+%% hibernate the process immediately, as it would if backoff wasn't
+%% being used. Instead it'll wait for the current timeout as described
+%% above.
+
+%% All modifications are (C) 2009-2010 LShift Ltd.
+
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%%
+%% $Id$
+%%
+-module(gen_server2).
+
+%%% ---------------------------------------------------
+%%%
+%%% The idea behind THIS server is that the user module
+%%% provides (different) functions to handle different
+%%% kind of inputs.
+%%% If the Parent process terminates the Module:terminate/2
+%%% function is called.
+%%%
+%%% The user module should export:
+%%%
+%%% init(Args)
+%%% ==> {ok, State}
+%%% {ok, State, Timeout}
+%%% {ok, State, Timeout, Backoff}
+%%% ignore
+%%% {stop, Reason}
+%%%
+%%% handle_call(Msg, {From, Tag}, State)
+%%%
+%%% ==> {reply, Reply, State}
+%%% {reply, Reply, State, Timeout}
+%%% {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, Reply, State}
+%%% Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%% handle_cast(Msg, State)
+%%%
+%%% ==> {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
+%%%
+%%% ==> {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% terminate(Reason, State) Let the user module clean up
+%%% always called when server terminates
+%%%
+%%% ==> ok
+%%%
+%%% handle_pre_hibernate(State)
+%%%
+%%% ==> {hibernate, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% handle_post_hibernate(State)
+%%%
+%%% ==> {noreply, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% The work flow (of the server) can be described as follows:
+%%%
+%%% User module Generic
+%%% ----------- -------
+%%% start -----> start
+%%% init <----- .
+%%%
+%%% loop
+%%% handle_call <----- .
+%%% -----> reply
+%%%
+%%% handle_cast <----- .
+%%%
+%%% handle_info <----- .
+%%%
+%%% terminate <----- .
+%%%
+%%% -----> reply
+%%%
+%%%
+%%% ---------------------------------------------------
+
+%% API
+-export([start/3, start/4,
+ start_link/3, start_link/4,
+ call/2, call/3, pcall/3, pcall/4,
+ cast/2, pcast/3, reply/2,
+ abcast/2, abcast/3,
+ multi_call/2, multi_call/3, multi_call/4,
+ enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
+
+-export([behaviour_info/1]).
+
+%% System exports
+-export([system_continue/3,
+ system_terminate/4,
+ system_code_change/4,
+ format_status/2]).
+
+%% Internal exports
+-export([init_it/6, print_event/3]).
+
+-import(error_logger, [format/2]).
+
+%%%=========================================================================
+%%% Specs. These exist only to shut up dialyzer's warnings
+%%%=========================================================================
+
+-ifdef(use_specs).
+
+-spec(handle_common_termination/6 ::
+ (any(), any(), any(), atom(), any(), any()) -> no_return()).
+
+-spec(hibernate/7 ::
+ (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()).
+
+-endif.
+
+%%%=========================================================================
+%%% API
+%%%=========================================================================
+
+behaviour_info(callbacks) ->
+ [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
+ {terminate,2},{code_change,3}];
+behaviour_info(_Other) ->
+ undefined.
+
+%%% -----------------------------------------------------------------
+%%% Starts a generic server.
+%%% start(Mod, Args, Options)
+%%% start(Name, Mod, Args, Options)
+%%% start_link(Mod, Args, Options)
+%%% start_link(Name, Mod, Args, Options) where:
+%%% Name ::= {local, atom()} | {global, atom()}
+%%% Mod ::= atom(), callback module implementing the 'real' server
+%%% Args ::= term(), init arguments (to Mod:init/1)
+%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
+%%% Flag ::= trace | log | {logfile, File} | statistics | debug
+%%% (debug == log && statistics)
+%%% Returns: {ok, Pid} |
+%%% {error, {already_started, Pid}} |
+%%% {error, Reason}
+%%% -----------------------------------------------------------------
+start(Mod, Args, Options) ->
+ gen:start(?MODULE, nolink, Mod, Args, Options).
+
+start(Name, Mod, Args, Options) ->
+ gen:start(?MODULE, nolink, Name, Mod, Args, Options).
+
+start_link(Mod, Args, Options) ->
+ gen:start(?MODULE, link, Mod, Args, Options).
+
+start_link(Name, Mod, Args, Options) ->
+ gen:start(?MODULE, link, Name, Mod, Args, Options).
+
+
+%% -----------------------------------------------------------------
+%% Make a call to a generic server.
+%% If the server is located at another node, that node will
+%% be monitored.
+%% If the client is trapping exits and is linked server termination
+%% is handled here (? Shall we do that here (or rely on timeouts) ?).
+%% -----------------------------------------------------------------
+call(Name, Request) ->
+ case catch gen:call(Name, '$gen_call', Request) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request]}})
+ end.
+
+call(Name, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_call', Request, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
+ end.
+
+pcall(Name, Priority, Request) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
+ end.
+
+pcall(Name, Priority, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+ end.
+
+%% -----------------------------------------------------------------
+%% Make a cast to a generic server.
+%% -----------------------------------------------------------------
+cast({global,Name}, Request) ->
+ catch global:send(Name, cast_msg(Request)),
+ ok;
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Request);
+cast(Dest, Request) when is_atom(Dest) ->
+ do_cast(Dest, Request);
+cast(Dest, Request) when is_pid(Dest) ->
+ do_cast(Dest, Request).
+
+do_cast(Dest, Request) ->
+ do_send(Dest, cast_msg(Request)),
+ ok.
+
+cast_msg(Request) -> {'$gen_cast',Request}.
+
+pcast({global,Name}, Priority, Request) ->
+ catch global:send(Name, cast_msg(Priority, Request)),
+ ok;
+pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_atom(Dest) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_pid(Dest) ->
+ do_cast(Dest, Priority, Request).
+
+do_cast(Dest, Priority, Request) ->
+ do_send(Dest, cast_msg(Priority, Request)),
+ ok.
+
+cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
+
+%% -----------------------------------------------------------------
+%% Send a reply to the client.
+%% -----------------------------------------------------------------
+reply({To, Tag}, Reply) ->
+ catch To ! {Tag, Reply}.
+
+%% -----------------------------------------------------------------
+%% Asyncronous broadcast, returns nothing, it's just send'n prey
+%%-----------------------------------------------------------------
+abcast(Name, Request) when is_atom(Name) ->
+ do_abcast([node() | nodes()], Name, cast_msg(Request)).
+
+abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
+ do_abcast(Nodes, Name, cast_msg(Request)).
+
+do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
+ do_send({Name,Node},Msg),
+ do_abcast(Nodes, Name, Msg);
+do_abcast([], _,_) -> abcast.
+
+%%% -----------------------------------------------------------------
+%%% Make a call to servers at several nodes.
+%%% Returns: {[Replies],[BadNodes]}
+%%% A Timeout can be given
+%%%
+%%% A middleman process is used in case late answers arrives after
+%%% the timeout. If they would be allowed to glog the callers message
+%%% queue, it would probably become confused. Late answers will
+%%% now arrive to the terminated middleman and so be discarded.
+%%% -----------------------------------------------------------------
+multi_call(Name, Req)
+ when is_atom(Name) ->
+ do_multi_call([node() | nodes()], Name, Req, infinity).
+
+multi_call(Nodes, Name, Req)
+ when is_list(Nodes), is_atom(Name) ->
+ do_multi_call(Nodes, Name, Req, infinity).
+
+multi_call(Nodes, Name, Req, infinity) ->
+ do_multi_call(Nodes, Name, Req, infinity);
+multi_call(Nodes, Name, Req, Timeout)
+ when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
+ do_multi_call(Nodes, Name, Req, Timeout).
+
+
+%%-----------------------------------------------------------------
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
+%%
+%% Description: Makes an existing process into a gen_server.
+%% The calling process will enter the gen_server receive
+%% loop and become a gen_server process.
+%% The process *must* have been started using one of the
+%% start functions in proc_lib, see proc_lib(3).
+%% The user is responsible for any initialization of the
+%% process, including registering a name for it.
+%%-----------------------------------------------------------------
+enter_loop(Mod, Options, State) ->
+ enter_loop(Mod, Options, State, self(), infinity, undefined).
+
+enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
+ enter_loop(Mod, Options, State, self(), infinity, Backoff);
+
+enter_loop(Mod, Options, State, ServerName = {_, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity, undefined);
+
+enter_loop(Mod, Options, State, Timeout) ->
+ enter_loop(Mod, Options, State, self(), Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
+
+enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
+ Name = get_proc_name(ServerName),
+ Parent = get_parent(),
+ Debug = debug_options(Name, Options),
+ Queue = priority_queue:new(),
+ Backoff1 = extend_backoff(Backoff),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug).
+
+%%%========================================================================
+%%% Gen-callback functions
+%%%========================================================================
+
+%%% ---------------------------------------------------
+%%% Initiate the new process.
+%%% Register the name using the Rfunc function
+%%% Calls the Mod:init/Args function.
+%%% Finally an acknowledge is sent to Parent and the main
+%%% loop is entered.
+%%% ---------------------------------------------------
+init_it(Starter, self, Name, Mod, Args, Options) ->
+ init_it(Starter, self(), Name, Mod, Args, Options);
+init_it(Starter, Parent, Name0, Mod, Args, Options) ->
+ Name = name(Name0),
+ Debug = debug_options(Name, Options),
+ Queue = priority_queue:new(),
+ case catch Mod:init(Args) of
+ {ok, State} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
+ {ok, State, Timeout} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ Backoff1 = extend_backoff(Backoff),
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
+ {stop, Reason} ->
+ %% For consistency, we must make sure that the
+ %% registered name (if any) is unregistered before
+ %% the parent process is notified about the failure.
+ %% (Otherwise, the parent process could get
+ %% an 'already_started' error if it immediately
+ %% tried starting the process again.)
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ ignore ->
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, ignore),
+ exit(normal);
+ {'EXIT', Reason} ->
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ Else ->
+ Error = {bad_return_value, Else},
+ proc_lib:init_ack(Starter, {error, Error}),
+ exit(Error)
+ end.
+
+name({local,Name}) -> Name;
+name({global,Name}) -> Name;
+%% name(Pid) when is_pid(Pid) -> Pid;
+%% when R12 goes away, drop the line beneath and uncomment the line above
+name(Name) -> Name.
+
+unregister_name({local,Name}) ->
+ _ = (catch unregister(Name));
+unregister_name({global,Name}) ->
+ _ = global:unregister_name(Name);
+unregister_name(Pid) when is_pid(Pid) ->
+ Pid;
+% Under R12 let's just ignore it, as we have a single term as Name.
+% On R13 it will never get here, as we get tuple with 'local/global' atom.
+unregister_name(_Name) -> ok.
+
+extend_backoff(undefined) ->
+ undefined;
+extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
+ {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
+
+%%%========================================================================
+%%% Internal functions
+%%%========================================================================
+%%% ---------------------------------------------------
+%%% The MAIN loop.
+%%% ---------------------------------------------------
+loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
+ pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug);
+loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
+ drain(Queue), Debug).
+
+drain(Queue) ->
+ receive
+ Input -> drain(in(Input, Queue))
+ after 0 -> Queue
+ end.
+
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ case priority_queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ process_msg(Parent, Name, State, Mod,
+ Time, TimeoutState, Queue1, Debug, Msg);
+ {empty, Queue1} ->
+ {Time1, HibOnTimeout}
+ = case {Time, TimeoutState} of
+ {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
+ {Current, true};
+ {hibernate, _} ->
+ %% wake_hib/7 will set Time to hibernate. If
+ %% we were woken and didn't receive a msg
+ %% then we will get here and need a sensible
+ %% value for Time1, otherwise we crash.
+ %% R13B1 always waits infinitely when waking
+ %% from hibernation, so that's what we do
+ %% here too.
+ {infinity, false};
+ _ -> {Time, false}
+ end,
+ receive
+ Input ->
+ %% Time could be 'hibernate' here, so *don't* call loop
+ process_next_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ drain(in(Input, Queue1)), Debug)
+ after Time1 ->
+ case HibOnTimeout of
+ true ->
+ pre_hibernate(
+ Parent, Name, State, Mod, TimeoutState, Queue1,
+ Debug);
+ false ->
+ process_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ Queue1, Debug, timeout)
+ end
+ end
+ end.
+
+wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) ->
+ TimeoutState1 = case TS of
+ undefined ->
+ undefined;
+ {SleptAt, TimeoutState} ->
+ adjust_timeout_state(SleptAt, now(), TimeoutState)
+ end,
+ post_hibernate(Parent, Name, State, Mod, TimeoutState1,
+ drain(Queue), Debug).
+
+hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ TS = case TimeoutState of
+ undefined -> undefined;
+ {backoff, _, _, _, _} -> {now(), TimeoutState}
+ end,
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
+ TS, Queue, Debug]).
+
+pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
+ true ->
+ case catch Mod:handle_pre_hibernate(State) of
+ {hibernate, NState} ->
+ hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
+ Debug);
+ Reply ->
+ handle_common_termination(Reply, Name, pre_hibernate,
+ Mod, State, Debug)
+ end;
+ false ->
+ hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug)
+ end.
+
+post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case erlang:function_exported(Mod, handle_post_hibernate, 1) of
+ true ->
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ process_next_msg(Parent, Name, NState, Mod, infinity,
+ TimeoutState, Queue, Debug);
+ {noreply, NState, Time} ->
+ process_next_msg(Parent, Name, NState, Mod, Time,
+ TimeoutState, Queue, Debug);
+ Reply ->
+ handle_common_termination(Reply, Name, post_hibernate,
+ Mod, State, Debug)
+ end;
+ false ->
+ %% use hibernate here, not infinity. This matches
+ %% R13B. The key is that we should be able to get through
+ %% to process_msg calling sys:handle_system_msg with Time
+ %% still set to hibernate, iff that msg is the very msg
+ %% that woke us up (or the first msg we receive after
+ %% waking up).
+ process_next_msg(Parent, Name, State, Mod, hibernate,
+ TimeoutState, Queue, Debug)
+ end.
+
+adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
+ DesiredHibPeriod, RandomState}) ->
+ NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
+ CurrentMicros = CurrentTO * 1000,
+ MinimumMicros = MinimumTO * 1000,
+ DesiredHibMicros = DesiredHibPeriod * 1000,
+ GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
+ Base =
+ %% If enough time has passed between the last two messages then we
+ %% should consider sleeping sooner. Otherwise stay awake longer.
+ case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
+ true -> lists:max([MinimumTO, CurrentTO div 2]);
+ false -> CurrentTO
+ end,
+ {Extra, RandomState1} = random:uniform_s(Base, RandomState),
+ CurrentTO1 = Base + Extra,
+ {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
+
+in({'$gen_pcast', {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
+in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
+in(Input, Queue) ->
+ priority_queue:in(Input, Queue).
+
+process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
+ Debug, Msg) ->
+ case Msg of
+ {system, From, Req} ->
+ sys:handle_system_msg(
+ Req, From, Parent, ?MODULE, Debug,
+ [Name, State, Mod, Time, TimeoutState, Queue]);
+ %% gen_server puts Hib on the end as the 7th arg, but that
+ %% version of the function seems not to be documented so
+ %% leaving out for now.
+ {'EXIT', Parent, Reason} ->
+ terminate(Reason, Name, Msg, Mod, State, Debug);
+ _Msg when Debug =:= [] ->
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
+ _Msg ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+ Name, {in, Msg}),
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue,
+ Debug1)
+ end.
+
+%%% ---------------------------------------------------
+%%% Send/recive functions
+%%% ---------------------------------------------------
+do_send(Dest, Msg) ->
+ catch erlang:send(Dest, Msg).
+
+do_multi_call(Nodes, Name, Req, infinity) ->
+ Tag = make_ref(),
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ rec_nodes(Tag, Monitors, Name, undefined);
+do_multi_call(Nodes, Name, Req, Timeout) ->
+ Tag = make_ref(),
+ Caller = self(),
+ Receiver =
+ spawn(
+ fun () ->
+ %% Middleman process. Should be unsensitive to regular
+ %% exit signals. The sychronization is needed in case
+ %% the receiver would exit before the caller started
+ %% the monitor.
+ process_flag(trap_exit, true),
+ Mref = erlang:monitor(process, Caller),
+ receive
+ {Caller,Tag} ->
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ TimerId = erlang:start_timer(Timeout, self(), ok),
+ Result = rec_nodes(Tag, Monitors, Name, TimerId),
+ exit({self(),Tag,Result});
+ {'DOWN',Mref,_,_,_} ->
+ %% Caller died before sending us the go-ahead.
+ %% Give up silently.
+ exit(normal)
+ end
+ end),
+ Mref = erlang:monitor(process, Receiver),
+ Receiver ! {self(),Tag},
+ receive
+ {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+ Result;
+ {'DOWN',Mref,_,_,Reason} ->
+ %% The middleman code failed. Or someone did
+ %% exit(_, kill) on the middleman process => Reason==killed
+ exit(Reason)
+ end.
+
+send_nodes(Nodes, Name, Tag, Req) ->
+ send_nodes(Nodes, Name, Tag, Req, []).
+
+send_nodes([Node|Tail], Name, Tag, Req, Monitors)
+ when is_atom(Node) ->
+ Monitor = start_monitor(Node, Name),
+ %% Handle non-existing names in rec_nodes.
+ catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
+ send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
+send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
+ %% Skip non-atom Node
+ send_nodes(Tail, Name, Tag, Req, Monitors);
+send_nodes([], _Name, _Tag, _Req, Monitors) ->
+ Monitors.
+
+%% Against old nodes:
+%% If no reply has been delivered within 2 secs. (per node) check that
+%% the server really exists and wait for ever for the answer.
+%%
+%% Against contemporary nodes:
+%% Wait for reply, server 'DOWN', or timeout from TimerId.
+
+rec_nodes(Tag, Nodes, Name, TimerId) ->
+ rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
+
+rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], Time, TimerId);
+ {timeout, TimerId, _} ->
+ unmonitor(R),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
+ %% R6 node
+ receive
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], 2000, TimerId);
+ {timeout, TimerId, _} ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+ after Time ->
+ case rpc:call(N, erlang, whereis, [Name]) of
+ Pid when is_pid(Pid) -> % It exists try again.
+ rec_nodes(Tag, [N|Tail], Name, Badnodes,
+ Replies, infinity, TimerId);
+ _ -> % badnode
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes],
+ Replies, 2000, TimerId)
+ end
+ end;
+rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
+ case catch erlang:cancel_timer(TimerId) of
+ false -> % It has already sent it's message
+ receive
+ {timeout, TimerId, _} -> ok
+ after 0 ->
+ ok
+ end;
+ _ -> % Timer was cancelled, or TimerId was 'undefined'
+ ok
+ end,
+ {Replies, Badnodes}.
+
+%% Collect all replies that already have arrived
+rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ after 0 ->
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
+ %% R6 node
+ receive
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ after 0 ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
+ {Replies, Badnodes}.
+
+
+%%% ---------------------------------------------------
+%%% Monitor functions
+%%% ---------------------------------------------------
+
+start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
+ if node() =:= nonode@nohost, Node =/= nonode@nohost ->
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+ {Node, Ref};
+ true ->
+ case catch erlang:monitor(process, {Name, Node}) of
+ {'EXIT', _} ->
+ %% Remote node is R6
+ monitor_node(Node, true),
+ Node;
+ Ref when is_reference(Ref) ->
+ {Node, Ref}
+ end
+ end.
+
+%% Cancels a monitor started with Ref=erlang:monitor(_, _).
+unmonitor(Ref) when is_reference(Ref) ->
+ erlang:demonitor(Ref),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ true
+ after 0 ->
+ true
+ end.
+
+%%% ---------------------------------------------------
+%%% Message handling functions
+%%% ---------------------------------------------------
+
+dispatch({'$gen_cast', Msg}, Mod, State) ->
+ Mod:handle_cast(Msg, State);
+dispatch(Info, Mod, State) ->
+ Mod:handle_info(Info, State).
+
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
+ case catch Mod:handle_call(Msg, From, State) of
+ {reply, Reply, NState} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+ {reply, Reply, NState, Time1} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Name, Msg, Mod, NState, [])),
+ reply(From, Reply),
+ exit(R);
+ Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue)
+ end;
+handle_msg(Msg,
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
+ Reply = (catch dispatch(Msg, Mod, State)),
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue).
+
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case catch Mod:handle_call(Msg, From, State) of
+ {reply, Reply, NState} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
+ {reply, Reply, NState, Time1} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ {noreply, NState} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
+ {noreply, NState, Time1} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
+ reply(Name, From, Reply, NState, Debug),
+ exit(R);
+ Other ->
+ handle_common_reply(Other, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue, Debug)
+ end;
+handle_msg(Msg,
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ Reply = (catch dispatch(Msg, Mod, State)),
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue, Debug).
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue) ->
+ case Reply of
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ _ ->
+ handle_common_termination(Reply, Name, Msg, Mod, State, [])
+ end.
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
+ Debug) ->
+ case Reply of
+ {noreply, NState} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
+ {noreply, NState, Time1} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ _ ->
+ handle_common_termination(Reply, Name, Msg, Mod, State, Debug)
+ end.
+
+handle_common_termination(Reply, Name, Msg, Mod, State, Debug) ->
+ case Reply of
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, Msg, Mod, NState, Debug);
+ {'EXIT', What} ->
+ terminate(What, Name, Msg, Mod, State, Debug);
+ _ ->
+ terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+ end.
+
+reply(Name, {To, Tag}, Reply, State, Debug) ->
+ reply({To, Tag}, Reply),
+ sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {out, Reply, To, State} ).
+
+
+%%-----------------------------------------------------------------
+%% Callback functions for system messages handling.
+%%-----------------------------------------------------------------
+system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
+
+-ifdef(use_specs).
+-spec system_terminate(_, _, _, [_]) -> no_return().
+-endif.
+
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time,
+ _TimeoutState, _Queue]) ->
+ terminate(Reason, Name, [], Mod, State, Debug).
+
+system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
+ OldVsn, Extra) ->
+ case catch Mod:code_change(OldVsn, State, Extra) of
+ {ok, NewState} ->
+ {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
+ Else ->
+ Else
+ end.
+
+%%-----------------------------------------------------------------
+%% Format debug messages. Print them as the call-back module sees
+%% them, not as the real erlang messages. Use trace for that.
+%%-----------------------------------------------------------------
+print_event(Dev, {in, Msg}, Name) ->
+ case Msg of
+ {'$gen_call', {From, _Tag}, Call} ->
+ io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+ [Name, Call, From]);
+ {'$gen_cast', Cast} ->
+ io:format(Dev, "*DBG* ~p got cast ~p~n",
+ [Name, Cast]);
+ _ ->
+ io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+ end;
+print_event(Dev, {out, Msg, To, State}, Name) ->
+ io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
+ [Name, Msg, To, State]);
+print_event(Dev, {noreply, State}, Name) ->
+ io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
+print_event(Dev, Event, Name) ->
+ io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
+
+
+%%% ---------------------------------------------------
+%%% Terminate the server.
+%%% ---------------------------------------------------
+
+terminate(Reason, Name, Msg, Mod, State, Debug) ->
+ case catch Mod:terminate(Reason, State) of
+ {'EXIT', R} ->
+ error_info(R, Reason, Name, Msg, State, Debug),
+ exit(R);
+ _ ->
+ case Reason of
+ normal ->
+ exit(normal);
+ shutdown ->
+ exit(shutdown);
+ {shutdown,_}=Shutdown ->
+ exit(Shutdown);
+ _ ->
+ error_info(Reason, undefined, Name, Msg, State, Debug),
+ exit(Reason)
+ end
+ end.
+
+error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
+ %% OTP-5811 Don't send an error report if it's the system process
+ %% application_controller which is terminating - let init take care
+ %% of it instead
+ ok;
+error_info(Reason, RootCause, Name, Msg, State, Debug) ->
+ Reason1 = error_reason(Reason),
+ Fmt =
+ "** Generic server ~p terminating~n"
+ "** Last message in was ~p~n"
+ "** When Server state == ~p~n"
+ "** Reason for termination == ~n** ~p~n",
+ case RootCause of
+ undefined -> format(Fmt, [Name, Msg, State, Reason1]);
+ _ -> format(Fmt ++ "** In 'terminate' callback "
+ "with reason ==~n** ~p~n",
+ [Name, Msg, State, Reason1,
+ error_reason(RootCause)])
+ end,
+ sys:print_log(Debug),
+ ok.
+
+error_reason({undef,[{M,F,A}|MFAs]} = Reason) ->
+ case code:is_loaded(M) of
+ false -> {'module could not be loaded',[{M,F,A}|MFAs]};
+ _ -> case erlang:function_exported(M, F, length(A)) of
+ true -> Reason;
+ false -> {'function not exported',[{M,F,A}|MFAs]}
+ end
+ end;
+error_reason(Reason) ->
+ Reason.
+
+%%% ---------------------------------------------------
+%%% Misc. functions.
+%%% ---------------------------------------------------
+
+opt(Op, [{Op, Value}|_]) ->
+ {ok, Value};
+opt(Op, [_|Options]) ->
+ opt(Op, Options);
+opt(_, []) ->
+ false.
+
+debug_options(Name, Opts) ->
+ case opt(debug, Opts) of
+ {ok, Options} -> dbg_options(Name, Options);
+ _ -> dbg_options(Name, [])
+ end.
+
+dbg_options(Name, []) ->
+ Opts =
+ case init:get_argument(generic_debug) of
+ error ->
+ [];
+ _ ->
+ [log, statistics]
+ end,
+ dbg_opts(Name, Opts);
+dbg_options(Name, Opts) ->
+ dbg_opts(Name, Opts).
+
+dbg_opts(Name, Opts) ->
+ case catch sys:debug_options(Opts) of
+ {'EXIT',_} ->
+ format("~p: ignoring erroneous debug options - ~p~n",
+ [Name, Opts]),
+ [];
+ Dbg ->
+ Dbg
+ end.
+
+get_proc_name(Pid) when is_pid(Pid) ->
+ Pid;
+get_proc_name({local, Name}) ->
+ case process_info(self(), registered_name) of
+ {registered_name, Name} ->
+ Name;
+ {registered_name, _Name} ->
+ exit(process_not_registered);
+ [] ->
+ exit(process_not_registered)
+ end;
+get_proc_name({global, Name}) ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(process_not_registered_globally);
+ Pid when Pid =:= self() ->
+ Name;
+ _Pid ->
+ exit(process_not_registered_globally)
+ end.
+
+get_parent() ->
+ case get('$ancestors') of
+ [Parent | _] when is_pid(Parent)->
+ Parent;
+ [Parent | _] when is_atom(Parent)->
+ name_to_pid(Parent);
+ _ ->
+ exit(process_was_not_started_by_proc_lib)
+ end.
+
+name_to_pid(Name) ->
+ case whereis(Name) of
+ undefined ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(could_not_find_registerd_name);
+ Pid ->
+ Pid
+ end;
+ Pid ->
+ Pid
+ end.
+
+%%-----------------------------------------------------------------
+%% Status information
+%%-----------------------------------------------------------------
+format_status(Opt, StatusData) ->
+ [PDict, SysState, Parent, Debug,
+ [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData,
+ NameTag = if is_pid(Name) ->
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
+ Header = lists:concat(["Status for generic server ", NameTag]),
+ Log = sys:get_debug(log, Debug, []),
+ Specfic =
+ case erlang:function_exported(Mod, format_status, 2) of
+ true ->
+ case catch Mod:format_status(Opt, [PDict, State]) of
+ {'EXIT', _} -> [{data, [{"State", State}]}];
+ Else -> Else
+ end;
+ _ ->
+ [{data, [{"State", State}]}]
+ end,
+ [{header, Header},
+ {data, [{"Status", SysState},
+ {"Parent", Parent},
+ {"Logged events", Log},
+ {"Queued messages", priority_queue:to_list(Queue)}]} |
+ Specfic].
191 src/priority_queue.erl
@@ -0,0 +1,191 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% Priority queues have essentially the same interface as ordinary
+%% queues, except that a) there is an in/3 that takes a priority, and
+%% b) we have only implemented the core API we need.
+%%
+%% Priorities should be integers - the higher the value the higher the
+%% priority - but we don't actually check that.
+%%
+%% in/2 inserts items with priority 0.
+%%
+%% We optimise the case where a priority queue is being used just like
+%% an ordinary queue. When that is the case we represent the priority
+%% queue as an ordinary queue. We could just call into the 'queue'
+%% module for that, but for efficiency we implement the relevant
+%% functions directly in here, thus saving on inter-module calls and
+%% eliminating a level of boxing.
+%%
+%% When the queue contains items with non-zero priorities, it is
+%% represented as a sorted kv list with the inverted Priority as the
+%% key and an ordinary queue as the value. Here again we use our own
+%% ordinary queue implemention for efficiency, often making recursive
+%% calls into the same function knowing that ordinary queues represent
+%% a base case.
+
+
+-module(priority_queue).
+
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
+ out/1, join/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(priority() :: integer()).
+-type(squeue() :: {queue, [any()], [any()]}).
+-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
+
+-spec(new/0 :: () -> pqueue()).
+-spec(is_queue/1 :: (any()) -> boolean()).
+-spec(is_empty/1 :: (pqueue()) -> boolean()).
+-spec(len/1 :: (pqueue()) -> non_neg_integer()).
+-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(in/2 :: (any(), pqueue()) -> pqueue()).
+-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
+-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+new() ->
+ {queue, [], []}.
+
+is_queue({queue, R, F}) when is_list(R), is_list(F) ->
+ true;
+is_queue({pqueue, Queues}) when is_list(Queues) ->
+ lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end,
+ Queues);
+is_queue(_) ->
+ false.
+
+is_empty({queue, [], []}) ->
+ true;
+is_empty(_) ->
+ false.
+
+len({queue, R, F}) when is_list(R), is_list(F) ->
+ length(R) + length(F);
+len({pqueue, Queues}) ->
+ lists:sum([len(Q) || {_, Q} <- Queues]).
+
+to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
+ [{0, V} || V <- Out ++ lists:reverse(In, [])];
+to_list({pqueue, Queues}) ->
+ [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)].
+
+in(Item, Q) ->
+ in(Item, 0, Q).
+
+in(X, 0, {queue, [_] = In, []}) ->
+ {queue, [X], In};
+in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
+ {queue, [X|In], Out};
+in(X, Priority, _Q = {queue, [], []}) ->
+ in(X, Priority, {pqueue, []});
+in(X, Priority, Q = {queue, _, _}) ->
+ in(X, Priority, {pqueue, [{0, Q}]});
+in(X, Priority, {pqueue, Queues}) ->
+ P = -Priority,
+ {pqueue, case lists:keysearch(P, 1, Queues) of
+ {value, {_, Q}} ->
+ lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
+ false ->
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ end}.
+
+out({queue, [], []} = Q) ->
+ {empty, Q};
+out({queue, [V], []}) ->
+ {{value, V}, {queue, [], []}};
+out({queue, [Y|In], []}) ->
+ [V|Out] = lists:reverse(In, []),
+ {{value, V}, {queue, [Y], Out}};
+out({queue, In, [V]}) when is_list(In) ->
+ {{value,V}, r2f(In)};
+out({queue, In,[V|Out]}) when is_list(In) ->
+ {{value, V}, {queue, In, Out}};
+out({pqueue, [{P, Q} | Queues]}) ->
+ {R, Q1} = out(Q),
+ NewQ = case is_empty(Q1) of
+ true -> case Queues of
+ [] -> {queue, [], []};
+ [{0, OnlyQ}] -> OnlyQ;
+ [_|_] -> {pqueue, Queues}
+ end;
+ false -> {pqueue, [{P, Q1} | Queues]}
+ end,
+ {R, NewQ}.
+
+join(A, {queue, [], []}) ->
+ A;
+join({queue, [], []}, B) ->
+ B;
+join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
+join(A = {queue, _, _}, {pqueue, BPQ}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ Post1 = case Post of
+ [] -> [ {0, A} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
+ _ -> [ {0, A} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, B = {queue, _, _}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ Post1 = case Post of
+ [] -> [ {0, B} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
+ _ -> [ {0, B} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, {pqueue, BPQ}) ->
+ {pqueue, merge(APQ, BPQ, [])}.
+
+merge([], BPQ, Acc) ->
+ lists:reverse(Acc, BPQ);
+merge(APQ, [], Acc) ->
+ lists:reverse(Acc, APQ);
+merge([{P, A}|As], [{P, B}|Bs], Acc) ->
+ merge(As, Bs, [ {P, join(A, B)} | Acc ]);
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+ merge(As, Bs, [ {PA, A} | Acc ]);
+merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
+ merge(As, Bs, [ {PB, B} | Acc ]).
+
+r2f([]) -> {queue, [], []};
+r2f([_] = R) -> {queue, [], R};
+r2f([X,Y]) -> {queue, [X], [Y]};
+r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
173 src/rabbit_basic.erl
@@ -0,0 +1,173 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_basic).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-export([publish/1, message/4, properties/1, delivery/4]).
+-export([publish/4, publish/7]).
+-export([build_content/2, from_content/1]).
+-export([is_message_persistent/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(properties_input() ::
+ (rabbit_framing:amqp_property_record() | [{atom(), any()}])).
+-type(publish_result() ::
+ ({ok, rabbit_router:routing_result(), [pid()]}
+ | rabbit_types:error('not_found'))).
+
+-spec(publish/1 ::
+ (rabbit_types:delivery()) -> publish_result()).
+-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
+ rabbit_types:message()) -> rabbit_types:delivery()).
+-spec(message/4 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ properties_input(), binary())
+ -> (rabbit_types:message() | rabbit_types:error(any()))).
+-spec(properties/1 ::
+ (properties_input()) -> rabbit_framing:amqp_property_record()).
+-spec(publish/4 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ properties_input(), binary())
+ -> publish_result()).
+-spec(publish/7 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
+ properties_input(), binary())
+ -> publish_result()).
+-spec(build_content/2 ::
+ (rabbit_framing:amqp_property_record(), binary())
+ -> rabbit_types:content()).
+-spec(from_content/1 ::
+ (rabbit_types:content())
+ -> {rabbit_framing:amqp_property_record(), binary()}).
+-spec(is_message_persistent/1 ::
+ (rabbit_types:decoded_content())
+ -> (boolean() | {'invalid', non_neg_integer()})).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+publish(Delivery = #delivery{
+ message = #basic_message{exchange_name = ExchangeName}}) ->
+ case rabbit_exchange:lookup(ExchangeName) of
+ {ok, X} ->
+ {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {ok, RoutingRes, DeliveredQPids};
+ Other ->
+ Other
+ end.
+
+delivery(Mandatory, Immediate, Txn, Message) ->
+ #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
+ sender = self(), message = Message}.
+
+build_content(Properties, BodyBin) ->
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
+ #content{class_id = ClassId,
+ properties = Properties,
+ properties_bin = none,
+ protocol = none,
+ payload_fragments_rev = [BodyBin]}.
+
+from_content(Content) ->
+ #content{class_id = ClassId,
+ properties = Props,
+ payload_fragments_rev = FragmentsRev} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
+ {Props, list_to_binary(lists:reverse(FragmentsRev))}.
+
+message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
+ Properties = properties(RawProperties),
+ Content = build_content(Properties, BodyBin),
+ case is_message_persistent(Content) of
+ {invalid, Other} ->
+ {error, {invalid_delivery_mode, Other}};
+ IsPersistent when is_boolean(IsPersistent) ->
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent}
+ end.
+
+properties(P = #'P_basic'{}) ->
+ P;
+properties(P) when is_list(P) ->
+ %% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2),
+ %% i.e. slow. Use the definition of 'P_basic' directly if
+ %% possible!
+ lists:foldl(fun ({Key, Value}, Acc) ->
+ case indexof(record_info(fields, 'P_basic'), Key) of
+ 0 -> throw({unknown_basic_property, Key});
+ N -> setelement(N + 1, Acc, Value)
+ end
+ end, #'P_basic'{}, P).
+
+indexof(L, Element) -> indexof(L, Element, 1).
+
+indexof([], _Element, _N) -> 0;
+indexof([Element | _Rest], Element, N) -> N;
+indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(ExchangeName, RoutingKeyBin, Properties, BodyBin) ->
+ publish(ExchangeName, RoutingKeyBin, false, false, none, Properties,
+ BodyBin).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
+ BodyBin) ->
+ publish(delivery(Mandatory, Immediate, Txn,
+ message(ExchangeName, RoutingKeyBin,
+ properties(Properties), BodyBin))).
+
+is_message_persistent(#content{properties = #'P_basic'{
+ delivery_mode = Mode}}) ->
+ case Mode of
+ 1 -> false;
+ 2 -> true;
+ undefined -> false;
+ Other -> {invalid, Other}
+ end.
308 src/rabbit_binary_generator.erl
@@ -0,0 +1,308 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_binary_generator).
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
+% - 1 byte of frame type
+% - 2 bytes of channel number
+% - 4 bytes of frame payload length
+% - 1 byte of payload trailer FRAME_END byte
+% See definition of check_empty_content_body_frame_size/0, an assertion called at startup.
+-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
+
+-export([build_simple_method_frame/3,
+ build_simple_content_frames/4,
+ build_heartbeat_frame/0]).
+-export([generate_table/1, encode_properties/2]).
+-export([check_empty_content_body_frame_size/0]).
+-export([ensure_content_encoded/2, clear_encoded_content/1]).
+
+-import(lists).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(frame() :: [binary()]).
+
+-spec(build_simple_method_frame/3 ::
+ (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(),
+ rabbit_types:protocol())
+ -> frame()).
+-spec(build_simple_content_frames/4 ::
+ (rabbit_channel:channel_number(), rabbit_types:content(),
+ non_neg_integer(), rabbit_types:protocol())
+ -> [frame()]).
+-spec(build_heartbeat_frame/0 :: () -> frame()).
+-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
+-spec(encode_properties/2 ::
+ ([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
+-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
+-spec(ensure_content_encoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol()) ->
+ rabbit_types:encoded_content()).
+-spec(clear_encoded_content/1 ::
+ (rabbit_types:content()) -> rabbit_types:unencoded_content()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+build_simple_method_frame(ChannelInt, MethodRecord, Protocol) ->
+ MethodFields = Protocol:encode_method_fields(MethodRecord),
+ MethodName = rabbit_misc:method_record_type(MethodRecord),
+ {ClassId, MethodId} = Protocol:method_id(MethodName),
+ create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]).
+
+build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) ->
+ #content{class_id = ClassId,
+ properties_bin = ContentPropertiesBin,
+ payload_fragments_rev = PayloadFragmentsRev} =
+ ensure_content_encoded(Content, Protocol),
+ {BodySize, ContentFrames} =
+ build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
+ HeaderFrame = create_frame(2, ChannelInt,
+ [<<ClassId:16, 0:16, BodySize:64>>,
+ ContentPropertiesBin]),
+ [HeaderFrame | ContentFrames].
+
+build_content_frames(FragsRev, FrameMax, ChannelInt) ->
+ BodyPayloadMax = if FrameMax == 0 ->
+ iolist_size(FragsRev);
+ true ->
+ FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE
+ end,
+ build_content_frames(0, [], BodyPayloadMax, [],
+ lists:reverse(FragsRev), BodyPayloadMax, ChannelInt).
+
+build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [],
+ [], _BodyPayloadMax, _ChannelInt) ->
+ {SizeAcc, lists:reverse(FramesAcc)};
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ Frags, BodyPayloadMax, ChannelInt)
+ when FragSizeRem == 0 orelse Frags == [] ->
+ Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)),
+ FrameSize = BodyPayloadMax - FragSizeRem,
+ build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc],
+ BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt);
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ [Frag | Frags], BodyPayloadMax, ChannelInt) ->
+ Size = size(Frag),
+ {NewFragSizeRem, NewFragAcc, NewFrags} =
+ if Size == 0 -> {FragSizeRem, FragAcc, Frags};
+ Size =< FragSizeRem -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
+ true -> <<Head:FragSizeRem/binary, Tail/binary>> =
+ Frag,
+ {0, [Head | FragAcc], [Tail | Frags]}
+ end,
+ build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc,
+ NewFrags, BodyPayloadMax, ChannelInt).
+
+build_heartbeat_frame() ->
+ create_frame(?FRAME_HEARTBEAT, 0, <<>>).
+
+create_frame(TypeInt, ChannelInt, PayloadBin) when is_binary(PayloadBin) ->
+ [<<TypeInt:8, ChannelInt:16, (size(PayloadBin)):32>>, PayloadBin, <<?FRAME_END>>];
+create_frame(TypeInt, ChannelInt, Payload) ->
+ create_frame(TypeInt, ChannelInt, list_to_binary(Payload)).
+
+%% table_field_to_binary supports the AMQP 0-8/0-9 standard types, S,
+%% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x,
+%% and V.
+
+table_field_to_binary({FName, Type, Value}) ->
+ [short_string_to_binary(FName) | field_value_to_binary(Type, Value)].
+
+field_value_to_binary(longstr, Value) ->
+ ["S", long_string_to_binary(Value)];
+
+field_value_to_binary(signedint, Value) ->
+ ["I", <<Value:32/signed>>];
+
+field_value_to_binary(decimal, {Before, After}) ->
+ ["D", Before, <<After:32>>];
+
+field_value_to_binary(timestamp, Value) ->
+ ["T", <<Value:64>>];
+
+field_value_to_binary(table, Value) ->
+ ["F", table_to_binary(Value)];
+
+field_value_to_binary(array, Value) ->
+ ["A", array_to_binary(Value)];
+
+field_value_to_binary(byte, Value) ->
+ ["b", <<Value:8/unsigned>>];
+
+field_value_to_binary(double, Value) ->
+ ["d", <<Value:64/float>>];
+
+field_value_to_binary(float, Value) ->
+ ["f", <<Value:32/float>>];
+
+field_value_to_binary(long, Value) ->
+ ["l", <<Value:64/signed>>];
+
+field_value_to_binary(short, Value) ->
+ ["s", <<Value:16/signed>>];
+
+field_value_to_binary(bool, Value) ->
+ ["t", if Value -> 1; true -> 0 end];
+
+field_value_to_binary(binary, Value) ->
+ ["x", long_string_to_binary(Value)];
+
+field_value_to_binary(void, _Value) ->
+ ["V"].
+
+table_to_binary(Table) when is_list(Table) ->
+ BinTable = generate_table(Table),
+ [<<(size(BinTable)):32>>, BinTable].
+
+array_to_binary(Array) when is_list(Array) ->
+ BinArray = generate_array(Array),