diff --git a/CREDITS b/CREDITS new file mode 100644 index 0000000..d043f38 --- /dev/null +++ b/CREDITS @@ -0,0 +1,10 @@ + +Following people contributed to this project: + +Barry Pederson - author of original Python lib +Vadim Zaliva - PHP paort +taavi013@gmail.com patches - patches +Sean Murphy http://code.google.com/u/sgmurphy/ - patches +spiderbill http://code.google.com/u/spiderbill/ - patches + + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3b473db --- /dev/null +++ b/LICENSE @@ -0,0 +1,458 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 2.1, February 1999 + + Copyright (C) 1991, 1999 Free Software Foundation, Inc. + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + +[This is the first released version of the Lesser GPL. It also counts + as the successor of the GNU Library Public License, version 2, hence + the version number 2.1.] + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +Licenses are intended to guarantee your freedom to share and change +free software--to make sure the software is free for all its users. + + This license, the Lesser General Public License, applies to some +specially designated software packages--typically libraries--of the +Free Software Foundation and other authors who decide to use it. You +can use it too, but we suggest you first think carefully about whether +this license or the ordinary General Public License is the better +strategy to use in any particular case, based on the explanations below. + + When we speak of free software, we are referring to freedom of use, +not price. Our General Public Licenses are designed to make sure that +you have the freedom to distribute copies of free software (and charge +for this service if you wish); that you receive source code or can get +it if you want it; that you can change the software and use pieces of +it in new free programs; and that you are informed that you can do +these things. + + To protect your rights, we need to make restrictions that forbid +distributors to deny you these rights or to ask you to surrender these +rights. These restrictions translate to certain responsibilities for +you if you distribute copies of the library or if you modify it. + + For example, if you distribute copies of the library, whether gratis +or for a fee, you must give the recipients all the rights that we gave +you. You must make sure that they, too, receive or can get the source +code. If you link other code with the library, you must provide +complete object files to the recipients, so that they can relink them +with the library after making changes to the library and recompiling +it. And you must show them these terms so they know their rights. + + We protect your rights with a two-step method: (1) we copyright the +library, and (2) we offer you this license, which gives you legal +permission to copy, distribute and/or modify the library. + + To protect each distributor, we want to make it very clear that +there is no warranty for the free library. Also, if the library is +modified by someone else and passed on, the recipients should know +that what they have is not the original version, so that the original +author's reputation will not be affected by problems that might be +introduced by others. + + Finally, software patents pose a constant threat to the existence of +any free program. We wish to make sure that a company cannot +effectively restrict the users of a free program by obtaining a +restrictive license from a patent holder. Therefore, we insist that +any patent license obtained for a version of the library must be +consistent with the full freedom of use specified in this license. + + Most GNU software, including some libraries, is covered by the +ordinary GNU General Public License. This license, the GNU Lesser +General Public License, applies to certain designated libraries, and +is quite different from the ordinary General Public License. We use +this license for certain libraries in order to permit linking those +libraries into non-free programs. + + When a program is linked with a library, whether statically or using +a shared library, the combination of the two is legally speaking a +combined work, a derivative of the original library. The ordinary +General Public License therefore permits such linking only if the +entire combination fits its criteria of freedom. The Lesser General +Public License permits more lax criteria for linking other code with +the library. + + We call this license the "Lesser" General Public License because it +does Less to protect the user's freedom than the ordinary General +Public License. It also provides other free software developers Less +of an advantage over competing non-free programs. These disadvantages +are the reason we use the ordinary General Public License for many +libraries. However, the Lesser license provides advantages in certain +special circumstances. + + For example, on rare occasions, there may be a special need to +encourage the widest possible use of a certain library, so that it becomes +a de-facto standard. To achieve this, non-free programs must be +allowed to use the library. A more frequent case is that a free +library does the same job as widely used non-free libraries. In this +case, there is little to gain by limiting the free library to free +software only, so we use the Lesser General Public License. + + In other cases, permission to use a particular library in non-free +programs enables a greater number of people to use a large body of +free software. For example, permission to use the GNU C Library in +non-free programs enables many more people to use the whole GNU +operating system, as well as its variant, the GNU/Linux operating +system. + + Although the Lesser General Public License is Less protective of the +users' freedom, it does ensure that the user of a program that is +linked with the Library has the freedom and the wherewithal to run +that program using a modified version of the Library. + + The precise terms and conditions for copying, distribution and +modification follow. Pay close attention to the difference between a +"work based on the library" and a "work that uses the library". The +former contains code derived from the library, whereas the latter must +be combined with the library in order to run. + + GNU LESSER GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License Agreement applies to any software library or other +program which contains a notice placed by the copyright holder or +other authorized party saying it may be distributed under the terms of +this Lesser General Public License (also called "this License"). +Each licensee is addressed as "you". + + A "library" means a collection of software functions and/or data +prepared so as to be conveniently linked with application programs +(which use some of those functions and data) to form executables. + + The "Library", below, refers to any such software library or work +which has been distributed under these terms. A "work based on the +Library" means either the Library or any derivative work under +copyright law: that is to say, a work containing the Library or a +portion of it, either verbatim or with modifications and/or translated +straightforwardly into another language. (Hereinafter, translation is +included without limitation in the term "modification".) + + "Source code" for a work means the preferred form of the work for +making modifications to it. For a library, complete source code means +all the source code for all modules it contains, plus any associated +interface definition files, plus the scripts used to control compilation +and installation of the library. + + Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running a program using the Library is not restricted, and output from +such a program is covered only if its contents constitute a work based +on the Library (independent of the use of the Library in a tool for +writing it). Whether that is true depends on what the Library does +and what the program that uses the Library does. + + 1. You may copy and distribute verbatim copies of the Library's +complete source code as you receive it, in any medium, provided that +you conspicuously and appropriately publish on each copy an +appropriate copyright notice and disclaimer of warranty; keep intact +all the notices that refer to this License and to the absence of any +warranty; and distribute a copy of this License along with the +Library. + + You may charge a fee for the physical act of transferring a copy, +and you may at your option offer warranty protection in exchange for a +fee. + + 2. You may modify your copy or copies of the Library or any portion +of it, thus forming a work based on the Library, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) The modified work must itself be a software library. + + b) You must cause the files modified to carry prominent notices + stating that you changed the files and the date of any change. + + c) You must cause the whole of the work to be licensed at no + charge to all third parties under the terms of this License. + + d) If a facility in the modified Library refers to a function or a + table of data to be supplied by an application program that uses + the facility, other than as an argument passed when the facility + is invoked, then you must make a good faith effort to ensure that, + in the event an application does not supply such function or + table, the facility still operates, and performs whatever part of + its purpose remains meaningful. + + (For example, a function in a library to compute square roots has + a purpose that is entirely well-defined independent of the + application. Therefore, Subsection 2d requires that any + application-supplied function or table used by this function must + be optional: if the application does not supply it, the square + root function must still compute square roots.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Library, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Library, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote +it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Library. + +In addition, mere aggregation of another work not based on the Library +with the Library (or with a work based on the Library) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may opt to apply the terms of the ordinary GNU General Public +License instead of this License to a given copy of the Library. To do +this, you must alter all the notices that refer to this License, so +that they refer to the ordinary GNU General Public License, version 2, +instead of to this License. (If a newer version than version 2 of the +ordinary GNU General Public License has appeared, then you can specify +that version instead if you wish.) Do not make any other change in +these notices. + + Once this change is made in a given copy, it is irreversible for +that copy, so the ordinary GNU General Public License applies to all +subsequent copies and derivative works made from that copy. + + This option is useful when you wish to copy part of the code of +the Library into a program that is not a library. + + 4. You may copy and distribute the Library (or a portion or +derivative of it, under Section 2) in object code or executable form +under the terms of Sections 1 and 2 above provided that you accompany +it with the complete corresponding machine-readable source code, which +must be distributed under the terms of Sections 1 and 2 above on a +medium customarily used for software interchange. + + If distribution of object code is made by offering access to copy +from a designated place, then offering equivalent access to copy the +source code from the same place satisfies the requirement to +distribute the source code, even though third parties are not +compelled to copy the source along with the object code. + + 5. A program that contains no derivative of any portion of the +Library, but is designed to work with the Library by being compiled or +linked with it, is called a "work that uses the Library". Such a +work, in isolation, is not a derivative work of the Library, and +therefore falls outside the scope of this License. + + However, linking a "work that uses the Library" with the Library +creates an executable that is a derivative of the Library (because it +contains portions of the Library), rather than a "work that uses the +library". The executable is therefore covered by this License. +Section 6 states terms for distribution of such executables. + + When a "work that uses the Library" uses material from a header file +that is part of the Library, the object code for the work may be a +derivative work of the Library even though the source code is not. +Whether this is true is especially significant if the work can be +linked without the Library, or if the work is itself a library. The +threshold for this to be true is not precisely defined by law. + + If such an object file uses only numerical parameters, data +structure layouts and accessors, and small macros and small inline +functions (ten lines or less in length), then the use of the object +file is unrestricted, regardless of whether it is legally a derivative +work. (Executables containing this object code plus portions of the +Library will still fall under Section 6.) + + Otherwise, if the work is a derivative of the Library, you may +distribute the object code for the work under the terms of Section 6. +Any executables containing that work also fall under Section 6, +whether or not they are linked directly with the Library itself. + + 6. As an exception to the Sections above, you may also combine or +link a "work that uses the Library" with the Library to produce a +work containing portions of the Library, and distribute that work +under terms of your choice, provided that the terms permit +modification of the work for the customer's own use and reverse +engineering for debugging such modifications. + + You must give prominent notice with each copy of the work that the +Library is used in it and that the Library and its use are covered by +this License. You must supply a copy of this License. If the work +during execution displays copyright notices, you must include the +copyright notice for the Library among them, as well as a reference +directing the user to the copy of this License. Also, you must do one +of these things: + + a) Accompany the work with the complete corresponding + machine-readable source code for the Library including whatever + changes were used in the work (which must be distributed under + Sections 1 and 2 above); and, if the work is an executable linked + with the Library, with the complete machine-readable "work that + uses the Library", as object code and/or source code, so that the + user can modify the Library and then relink to produce a modified + executable containing the modified Library. (It is understood + that the user who changes the contents of definitions files in the + Library will not necessarily be able to recompile the application + to use the modified definitions.) + + b) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (1) uses at run time a + copy of the library already present on the user's computer system, + rather than copying library functions into the executable, and (2) + will operate properly with a modified version of the library, if + the user installs one, as long as the modified version is + interface-compatible with the version that the work was made with. + + c) Accompany the work with a written offer, valid for at + least three years, to give the same user the materials + specified in Subsection 6a, above, for a charge no more + than the cost of performing this distribution. + + d) If distribution of the work is made by offering access to copy + from a designated place, offer equivalent access to copy the above + specified materials from the same place. + + e) Verify that the user has already received a copy of these + materials or that you have already sent this user a copy. + + For an executable, the required form of the "work that uses the +Library" must include any data and utility programs needed for +reproducing the executable from it. However, as a special exception, +the materials to be distributed need not include anything that is +normally distributed (in either source or binary form) with the major +components (compiler, kernel, and so on) of the operating system on +which the executable runs, unless that component itself accompanies +the executable. + + It may happen that this requirement contradicts the license +restrictions of other proprietary libraries that do not normally +accompany the operating system. Such a contradiction means you cannot +use both them and the Library together in an executable that you +distribute. + + 7. You may place library facilities that are a work based on the +Library side-by-side in a single library together with other library +facilities not covered by this License, and distribute such a combined +library, provided that the separate distribution of the work based on +the Library and of the other library facilities is otherwise +permitted, and provided that you do these two things: + + a) Accompany the combined library with a copy of the same work + based on the Library, uncombined with any other library + facilities. This must be distributed under the terms of the + Sections above. + + b) Give prominent notice with the combined library of the fact + that part of it is a work based on the Library, and explaining + where to find the accompanying uncombined form of the same work. + + 8. You may not copy, modify, sublicense, link with, or distribute +the Library except as expressly provided under this License. Any +attempt otherwise to copy, modify, sublicense, link with, or +distribute the Library is void, and will automatically terminate your +rights under this License. However, parties who have received copies, +or rights, from you under this License will not have their licenses +terminated so long as such parties remain in full compliance. + + 9. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Library or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Library (or any work based on the +Library), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Library or works based on it. + + 10. Each time you redistribute the Library (or any work based on the +Library), the recipient automatically receives a license from the +original licensor to copy, distribute, link with or modify the Library +subject to these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties with +this License. + + 11. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Library at all. For example, if a patent +license would not permit royalty-free redistribution of the Library by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Library. + +If any portion of this section is held invalid or unenforceable under any +particular circumstance, the balance of the section is intended to apply, +and the section as a whole is intended to apply in other circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 12. If the distribution and/or use of the Library is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Library under this License may add +an explicit geographical distribution limitation excluding those countries, +so that distribution is permitted only in or among countries not thus +excluded. In such case, this License incorporates the limitation as if +written in the body of this License. + + 13. The Free Software Foundation may publish revised and/or new +versions of the Lesser General Public License from time to time. +Such new versions will be similar in spirit to the present version, +but may differ in detail to address new problems or concerns. + +Each version is given a distinguishing version number. If the Library +specifies a version number of this License which applies to it and +"any later version", you have the option of following the terms and +conditions either of that version or of any later version published by +the Free Software Foundation. If the Library does not specify a +license version number, you may choose any version ever published by +the Free Software Foundation. + + 14. If you wish to incorporate parts of the Library into other free +programs whose distribution conditions are incompatible with these, +write to the author to ask for permission. For software which is +copyrighted by the Free Software Foundation, write to the Free +Software Foundation; we sometimes make exceptions for this. Our +decision will be guided by the two goals of preserving the free status +of all derivatives of our free software and of promoting the sharing +and reuse of software generally. + + NO WARRANTY + + 15. BECAUSE THE LIBRARY IS LICENSED FREE OF CHARGE, THERE IS NO +WARRANTY FOR THE LIBRARY, TO THE EXTENT PERMITTED BY APPLICABLE LAW. +EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR +OTHER PARTIES PROVIDE THE LIBRARY "AS IS" WITHOUT WARRANTY OF ANY +KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE +LIBRARY IS WITH YOU. SHOULD THE LIBRARY PROVE DEFECTIVE, YOU ASSUME +THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN +WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY +AND/OR REDISTRIBUTE THE LIBRARY AS PERMITTED ABOVE, BE LIABLE TO YOU +FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR +CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE +LIBRARY (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING +RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A +FAILURE OF THE LIBRARY TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF +SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +DAMAGES. + + END OF TERMS AND CONDITIONS diff --git a/README.md b/README.md new file mode 100644 index 0000000..2e705d8 --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +# NOTE # + +This library is a fork of the [php-amqplib](http://code.google.com/p/php-amqplib/) library. + +We modified that library in order to work with PHP 5.3 Strict. + +Also we improved the debug method to increase performance. + +We use it daily in prod for sending/consuming 600K + messages per day. + +Below is the original README file content. Credits goes to the original authors. + +# Original README: # + +PHP library implementing Advanced Message Queuing Protocol (AMQP). + +The library is port of python code of py-amqplib +http://barryp.org/software/py-amqplib/ + +It have been tested with RabbitMQ server. + +Project home page: http://code.google.com/p/php-amqplib/ + +For discussion, please join the group: + +http://groups.google.com/group/php-amqplib-devel + +For bug reports, please use bug tracking system at the project page. + +Patched are very welcome! + +Author: Vadim Zaliva + + diff --git a/amqp.inc b/amqp.inc new file mode 100644 index 0000000..d75269c --- /dev/null +++ b/amqp.inc @@ -0,0 +1,1524 @@ + + * + */ + +require_once('amqp_wire.inc'); +require_once('hexdump.inc'); + +function debug_msg($s) +{ + echo $s, "\n"; +} + +function methodSig($a) +{ + if(is_string($a)) + return $a; + else + return sprintf("%d,%d",$a[0] ,$a[1]); +} + + +class AMQPException extends Exception +{ + public function __construct($reply_code, $reply_text, $method_sig) + { + parent::__construct(NULL,0); + + $this->amqp_reply_code = $reply_code; + $this->amqp_reply_text = $reply_text; + $this->amqp_method_sig = $method_sig; + + global $METHOD_NAME_MAP; + $ms=methodSig($method_sig); + if(array_key_exists($ms, $METHOD_NAME_MAP)) + $mn = $METHOD_NAME_MAP[$ms]; + else + $mn = ""; + $this->args = array( + $reply_code, + $reply_text, + $method_sig, + $mn + ); + } +} + +class AMQPConnectionException extends AMQPException +{ + public function __construct($reply_code, $reply_text, $method_sig) + { + parent::__construct($reply_code, $reply_text, $method_sig); + } +} + +class AMQPChannelException extends AMQPException +{ + public function __construct($reply_code, $reply_text, $method_sig) + { + parent::__construct($reply_code, $reply_text, $method_sig); + } + +} + + +$METHOD_NAME_MAP = array( + "10,10" => "Connection.start", + "10,11" => "Connection.start_ok", + "10,20" => "Connection.secure", + "10,21" => "Connection.secure_ok", + "10,30" => "Connection.tune", + "10,31" => "Connection.tune_ok", + "10,40" => "Connection.open", + "10,41" => "Connection.open_ok", + "10,50" => "Connection.redirect", + "10,60" => "Connection.close", + "10,61" => "Connection.close_ok", + "20,10" => "Channel.open", + "20,11" => "Channel.open_ok", + "20,20" => "Channel.flow", + "20,21" => "Channel.flow_ok", + "20,30" => "Channel.alert", + "20,40" => "Channel.close", + "20,41" => "Channel.close_ok", + "30,10" => "Channel.access_request", + "30,11" => "Channel.access_request_ok", + "40,10" => "Channel.exchange_declare", + "40,11" => "Channel.exchange_declare_ok", + "40,20" => "Channel.exchange_delete", + "40,21" => "Channel.exchange_delete_ok", + "50,10" => "Channel.queue_declare", + "50,11" => "Channel.queue_declare_ok", + "50,20" => "Channel.queue_bind", + "50,21" => "Channel.queue_bind_ok", + "50,30" => "Channel.queue_purge", + "50,31" => "Channel.queue_purge_ok", + "50,40" => "Channel.queue_delete", + "50,41" => "Channel.queue_delete_ok", + "60,10" => "Channel.basic_qos", + "60,11" => "Channel.basic_qos_ok", + "60,20" => "Channel.basic_consume", + "60,21" => "Channel.basic_consume_ok", + "60,30" => "Channel.basic_cancel", + "60,31" => "Channel.basic_cancel_ok", + "60,40" => "Channel.basic_publish", + "60,50" => "Channel.basic_return", + "60,60" => "Channel.basic_deliver", + "60,70" => "Channel.basic_get", + "60,71" => "Channel.basic_get_ok", + "60,72" => "Channel.basic_get_empty", + "60,80" => "Channel.basic_ack", + "60,90" => "Channel.basic_reject", + "60,100" => "Channel.basic_recover", + "90,10" => "Channel.tx_select", + "90,11" => "Channel.tx_select_ok", + "90,20" => "Channel.tx_commit", + "90,21" => "Channel.tx_commit_ok", + "90,30" => "Channel.tx_rollback", + "90,31" => "Channel.tx_rollback_ok" +); + +class AbstractChannel +{ + private static $CONTENT_METHODS = array( + "60,60", // Basic.deliver + "60,71", // Basic.get_ok + ); + + private static $CLOSE_METHODS = array( + "10,60", // Connection.close + "20,40", // Channel.close + ); + + protected $debug; + + public function __construct($connection, $channel_id) + { + $this->connection = $connection; + $this->channel_id = $channel_id; + $connection->channels[$channel_id] = $this; + $this->frame_queue = array(); // Lower level queue for frames + $this->method_queue = array(); // Higher level queue for methods + $this->auto_decode = false; + $this->debug = defined('AMQP_DEBUG') ? AMQP_DEBUG : false; + } + + public function getChannelId() + { + return $this->channel_id; + } + + + function dispatch($method_sig, $args, $content) + { + if(!array_key_exists($method_sig, $this->method_map)) + throw new Exception("Unknown AMQP method $method_sig"); + + $amqp_method = $this->method_map[$method_sig]; + if($content == NULL) + return call_user_func(array($this,$amqp_method), $args); + else + return call_user_func(array($this,$amqp_method), $args, $content); + } + + function next_frame() + { + if($this->debug) + { + debug_msg("waiting for a new frame"); + } + if($this->frame_queue != NULL) + return array_pop($this->frame_queue); + return $this->connection->wait_channel($this->channel_id); + } + + protected function send_method_frame($method_sig, $args="") + { + $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args); + } + + function wait_content() + { + $frm = $this->next_frame(); + $frame_type = $frm[0]; + $payload = $frm[1]; + if($frame_type != 2) + throw new Exception("Expecting Content header"); + + $payload_reader = new AMQPReader(substr($payload,0,12)); + $class_id = $payload_reader->read_short(); + $weight = $payload_reader->read_short(); + + $body_size = $payload_reader->read_longlong(); + $msg = new AMQPMessage(); + $msg->load_properties(substr($payload,12)); + + $body_parts = array(); + $body_received = 0; + while(bccomp($body_size,$body_received)==1) + { + $frm = $this->next_frame(); + $frame_type = $frm[0]; + $payload = $frm[1]; + if($frame_type != 3) + throw new Exception("Expecting Content body, received frame type $frame_type"); + $body_parts[] = $payload; + $body_received = bcadd($body_received, strlen($payload)); + } + + $msg->body = implode("",$body_parts); + + if($this->auto_decode and isset($msg->content_encoding)) + { + try + { + $msg->body = $msg->body->decode($msg->content_encoding); + } catch (Exception $e) { + if($this->debug) + { + debug_msg("Ignoring body decoding exception: " . $e->getMessage()); + } + } + } + + return $msg; + } + + /** + * Wait for some expected AMQP methods and dispatch to them. + * Unexpected methods are queued up for later calls to this Python + * method. + */ + public function wait($allowed_methods=NULL) + { + if($allowed_methods) + { + if($this->debug) + { + debug_msg("waiting for " . implode(", ", $allowed_methods)); + } + } + else + { + if($this->debug) + { + debug_msg("waiting for any method"); + } + } + + //Process deferred methods + foreach($this->method_queue as $qk=>$queued_method) + { + if($this->debug) + { + debug_msg("checking queue method " . $qk); + } + + $method_sig = $queued_method[0]; + if($allowed_methods==NULL || in_array($method_sig, $allowed_methods)) + { + unset($this->method_queue[$qk]); + + if($this->debug) + { + debug_msg("Executing queued method: $method_sig: " . $METHOD_NAME_MAP[methodSig($method_sig)]); + } + + return $this->dispatch($queued_method[0], + $queued_method[1], + $queued_method[2]); + } + } + + // No deferred methods? wait for new ones + while(true) + { + $frm = $this->next_frame(); + $frame_type = $frm[0]; + $payload = $frm[1]; + + if($frame_type != 1) + throw new Exception("Expecting AMQP method, received frame type: $frame_type"); + + if(strlen($payload) < 4) + throw new Exception("Method frame too short"); + + $method_sig_array = unpack("n2", substr($payload,0,4)); + $method_sig = "" . $method_sig_array[1] . "," . $method_sig_array[2]; + $args = new AMQPReader(substr($payload,4)); + + global $METHOD_NAME_MAP; + if($this->debug) + { + debug_msg("> $method_sig: " . $METHOD_NAME_MAP[methodSig($method_sig)]); + } + + + if(in_array($method_sig, AbstractChannel::$CONTENT_METHODS)) + $content = $this->wait_content(); + else + $content = NULL; + + if($allowed_methods==NULL || + in_array($method_sig,$allowed_methods) || + in_array($method_sig,AbstractChannel::$CLOSE_METHODS)) + { + return $this->dispatch($method_sig, $args, $content); + } + + // Wasn't what we were looking for? save it for later + global $METHOD_NAME_MAP; + if($this->debug) + { + debug_msg("Queueing for later: $method_sig: " . $METHOD_NAME_MAP[methodSig($method_sig)]); + } + array_push($this->method_queue,array($method_sig, $args, $content)); + } + } + +} + +class AMQPConnection extends AbstractChannel +{ + public static $AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x09\x01"; + + public static $LIBRARY_PROPERTIES = array( + "library" => array('S', "PHP Simple AMQP lib"), + "library_version" => array('S', "0.1") + ); + + protected $method_map = array( + "10,10" => "start", + "10,20" => "secure", + "10,30" => "tune", + "10,41" => "open_ok", + "10,50" => "redirect", + "10,60" => "_close", + "10,61" => "close_ok" + ); + + public function __construct($host, $port, + $user, $password, + $vhost="/",$insist=false, + $login_method="AMQPLAIN", + $login_response=NULL, + $locale="en_US", + $connection_timeout = 3, + $read_write_timeout = 3) + { + + if($user && $password) + { + $login_response = new AMQPWriter(); + $login_response->write_table(array("LOGIN" => array('S',$user), + "PASSWORD" => array('S',$password))); + $login_response = substr($login_response->getvalue(),4); //Skip the length + } else + $login_response = NULL; + + + $d = AMQPConnection::$LIBRARY_PROPERTIES; + while(true) + { + $this->channels = array(); + // The connection object itself is treated as channel 0 + parent::__construct($this, 0); + + $this->channel_max = 65535; + $this->frame_max = 131072; + + $errstr = $errno = NULL; + $this->sock = NULL; + if (!($this->sock = fsockopen($host,$port,$errno,$errstr,$connection_timeout))) + { + throw new Exception ("Error Connecting to server($errno): $errstr "); + } + + stream_set_timeout($this->sock, $read_write_timeout); + stream_set_blocking($this->sock, 1); + $this->input = new AMQPReader(null, $this->sock); + + $this->write(AMQPConnection::$AMQP_PROTOCOL_HEADER); + $this->wait(array("10,10")); + $this->x_start_ok($d, $login_method, $login_response, $locale); + + $this->wait_tune_ok = true; + while($this->wait_tune_ok) + { + $this->wait(array( + "10,20", // secure + "10,30", // tune + )); + } + + $host = $this->x_open($vhost,"", $insist); + if(!$host) + return; // we weren't redirected + + // we were redirected, close the socket, loop and try again + if($this->debug) + { + debug_msg("closing socket"); + } + + @fclose($this->sock); $this->sock=NULL; + } + } + + public function __destruct() + { + if(isset($this->input)) + if($this->input) + $this->close(); + + if($this->sock) + { + if($this->debug) + { + debug_msg("closing socket"); + } + + @fclose($this->sock); + } + } + + protected function write($data) + { + if($this->debug) + { + debug_msg("< [hex]:\n" . hexdump($data, $htmloutput = false, $uppercase = true, $return = true)); + } + + $len = strlen($data); + while(true) + { + if(false == ($written = fwrite($this->sock, $data))) + { + throw new Exception ("Error sending data"); + } + $len = $len - $written; + if($len>0) + $data=substr($data,0-$len); + else + break; + } + } + + protected function do_close() + { + if(isset($this->input)) + if($this->input) + { + $this->input->close(); + $this->input = NULL; + } + + if($this->sock) + { + if($this->debug) + { + debug_msg("closing socket"); + } + + @fclose($this->sock); + $this->sock = NULL; + } + } + + public function get_free_channel_id() + { + for($i=1;$i<=$this->channel_max;$i++) + if(!array_key_exists($i,$this->channels)) + return $i; + throw new Exception("No free channel ids"); + } + + public function send_content($channel, $class_id, $weight, $body_size, + $packed_properties, $body) + { + $pkt = new AMQPWriter(); + + $pkt->write_octet(2); + $pkt->write_short($channel); + $pkt->write_long(strlen($packed_properties)+12); + + $pkt->write_short($class_id); + $pkt->write_short($weight); + $pkt->write_longlong($body_size); + $pkt->write($packed_properties); + + $pkt->write_octet(0xCE); + $pkt = $pkt->getvalue(); + $this->write($pkt); + + while($body) + { + $payload = substr($body,0, $this->frame_max-8); + $body = substr($body,$this->frame_max-8); + $pkt = new AMQPWriter(); + + $pkt->write_octet(3); + $pkt->write_short($channel); + $pkt->write_long(strlen($payload)); + + $pkt->write($payload); + + $pkt->write_octet(0xCE); + $pkt = $pkt->getvalue(); + $this->write($pkt); + } + } + + protected function send_channel_method_frame($channel, $method_sig, $args="") + { + if($args instanceof AMQPWriter) + $args = $args->getvalue(); + + $pkt = new AMQPWriter(); + + $pkt->write_octet(1); + $pkt->write_short($channel); + $pkt->write_long(strlen($args)+4); // 4 = length of class_id and method_id + // in payload + + $pkt->write_short($method_sig[0]); // class_id + $pkt->write_short($method_sig[1]); // method_id + $pkt->write($args); + + $pkt->write_octet(0xCE); + $pkt = $pkt->getvalue(); + $this->write($pkt); + + global $METHOD_NAME_MAP; + if($this->debug) + { + debug_msg("< " . methodSig($method_sig) . ": " . $METHOD_NAME_MAP[methodSig($method_sig)]); + } + + } + + /** + * Wait for a frame from the server + */ + protected function wait_frame() + { + $frame_type = $this->input->read_octet(); + $channel = $this->input->read_short(); + $size = $this->input->read_long(); + $payload = $this->input->read($size); + + $ch = $this->input->read_octet(); + if($ch != 0xCE) + throw new Exception(sprintf("Framing error, unexpected byte: %x", $ch)); + + return array($frame_type, $channel, $payload); + } + + /** + * Wait for a frame from the server destined for + * a particular channel. + */ + protected function wait_channel($channel_id) + { + while(true) + { + list($frame_type, $frame_channel, $payload) = $this->wait_frame(); + if($frame_channel == $channel_id) + return array($frame_type, $payload); + + // Not the channel we were looking for. Queue this frame + //for later, when the other channel is looking for frames. + array_push($this->channels[$frame_channel]->frame_queue, + array($frame_type, $payload)); + + // If we just queued up a method for channel 0 (the Connection + // itself) it's probably a close method in reaction to some + // error, so deal with it right away. + if(($frame_type == 1) && ($frame_channel == 0)) + $this->wait(); + } + } + + /** + * Fetch a Channel object identified by the numeric channel_id, or + * create that object if it doesn't already exist. + */ + public function channel($channel_id=NULL) + { + if(array_key_exists($channel_id,$this->channels)) + return $this->channels[$channel_id]; + + return new AMQPChannel($this->connection, $channel_id); + } + + /** + * request a connection close + */ + public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0)) + { + $args = new AMQPWriter(); + $args->write_short($reply_code); + $args->write_shortstr($reply_text); + $args->write_short($method_sig[0]); // class_id + $args->write_short($method_sig[1]); // method_id + $this->send_method_frame(array(10, 60), $args); + return $this->wait(array( + "10,61", // Connection.close_ok + )); + } + + public static function dump_table($table) + { + $tokens = array(); + foreach ($table as $name => $value) + { + switch ($value[0]) + { + case 'D': + $val = $value[1]->n . 'E' . $value[1]->e; + break; + case 'F': + $val = '(' . self::dump_table($value[1]) . ')'; + case 'T': + $val = date('Y-m-d H:i:s', $value[1]); + break; + default: + $val = $value[1]; + } + $tokens[] = $name . '=' . $val; + } + return implode(', ', $tokens); + + } + + protected function _close($args) + { + $reply_code = $args->read_short(); + $reply_text = $args->read_shortstr(); + $class_id = $args->read_short(); + $method_id = $args->read_short(); + + $this->x_close_ok(); + + throw new AMQPConnectionException($reply_code, $reply_text, array($class_id, $method_id)); + } + + + /** + * confirm a connection close + */ + protected function x_close_ok() + { + $this->send_method_frame(array(10, 61)); + $this->do_close(); + } + + /** + * confirm a connection close + */ + protected function close_ok($args) + { + $this->do_close(); + } + + protected function x_open($virtual_host, $capabilities="", $insist=false) + { + $args = new AMQPWriter(); + $args->write_shortstr($virtual_host); + $args->write_shortstr($capabilities); + $args->write_bit($insist); + $this->send_method_frame(array(10, 40), $args); + return $this->wait(array( + "10,41", // Connection.open_ok + "10,50" // Connection.redirect + )); + } + + + /** + * signal that the connection is ready + */ + protected function open_ok($args) + { + $this->known_hosts = $args->read_shortstr(); + if($this->debug) + { + debug_msg("Open OK! known_hosts: " . $this->known_hosts); + } + + return NULL; + } + + + /** + * asks the client to use a different server + */ + protected function redirect($args) + { + $host = $args->read_shortstr(); + $this->known_hosts = $args->read_shortstr(); + if($this->debug) + { + debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" ); + } + return $host; + } + + /** + * security mechanism challenge + */ + protected function secure($args) + { + $challenge = $args->read_longstr(); + } + + /** + * security mechanism response + */ + protected function x_secure_ok($response) + { + $args = new AMQPWriter(); + $args->write_longstr($response); + $this->send_method_frame(array(10, 21), $args); + } + + /** + * start connection negotiation + */ + protected function start($args) + { + $this->version_major = $args->read_octet(); + $this->version_minor = $args->read_octet(); + $this->server_properties = $args->read_table(); + $this->mechanisms = explode(" ", $args->read_longstr()); + $this->locales = explode(" ", $args->read_longstr()); + + if($this->debug) + { + debug_msg(sprintf("Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s", + $this->version_major, + $this->version_minor, + self::dump_table($this->server_properties), + implode(', ', $this->mechanisms), + implode(', ', $this->locales))); + } + + } + + + protected function x_start_ok($client_properties, $mechanism, $response, $locale) + { + $args = new AMQPWriter(); + $args->write_table($client_properties); + $args->write_shortstr($mechanism); + $args->write_longstr($response); + $args->write_shortstr($locale); + $this->send_method_frame(array(10, 11), $args); + } + + /** + * propose connection tuning parameters + */ + protected function tune($args) + { + $v=$args->read_short(); + if($v) + $this->channel_max = $v; + $v=$args->read_long(); + if($v) + $this->frame_max = $v; + $this->heartbeat = $args->read_short(); + + $this->x_tune_ok($this->channel_max, $this->frame_max, 0); + } + + /** + * negotiate connection tuning parameters + */ + protected function x_tune_ok($channel_max, $frame_max, $heartbeat) + { + $args = new AMQPWriter(); + $args->write_short($channel_max); + $args->write_long($frame_max); + $args->write_short($heartbeat); + $this->send_method_frame(array(10, 31), $args); + $this->wait_tune_ok = False; + } + +} + +class AMQPChannel extends AbstractChannel +{ + protected $method_map = array( + "20,11" => "open_ok", + "20,20" => "flow", + "20,21" => "flow_ok", + "20,30" => "alert", + "20,40" => "_close", + "20,41" => "close_ok", + "30,11" => "access_request_ok", + "40,11" => "exchange_declare_ok", + "40,21" => "exchange_delete_ok", + "50,11" => "queue_declare_ok", + "50,21" => "queue_bind_ok", + "50,31" => "queue_purge_ok", + "50,41" => "queue_delete_ok", + "60,11" => "basic_qos_ok", + "60,21" => "basic_consume_ok", + "60,31" => "basic_cancel_ok", + "60,50" => "basic_return", + "60,60" => "basic_deliver", + "60,71" => "basic_get_ok", + "60,72" => "basic_get_empty", + "90,11" => "tx_select_ok", + "90,21" => "tx_commit_ok", + "90,31" => "tx_rollback_ok" + ); + + public function __construct($connection, + $channel_id=NULL, + $auto_decode=true) + { + + if($channel_id == NULL) + $channel_id = $connection->get_free_channel_id(); + + parent::__construct($connection, $channel_id); + + if($this->debug) + { + debug_msg("using channel_id: " . $channel_id); + } + + $this->default_ticket = 0; + $this->is_open = false; + $this->active = true; // Flow control + $this->alerts = array(); + $this->callbacks = array(); + $this->auto_decode = $auto_decode; + + $this->x_open(); + } + + public function __destruct() + { + //TODO:???if($this->connection) + // $this->close("destroying channel"); + } + + /** + * Tear down this object, after we've agreed to close with the server. + */ + protected function do_close() + { + $this->is_open = false; + unset($this->connection->channels[$this->channel_id]); + $this->channel_id = $this->connection = NULL; + } + + /** + * This method allows the server to send a non-fatal warning to + * the client. This is used for methods that are normally + * asynchronous and thus do not have confirmations, and for which + * the server may detect errors that need to be reported. Fatal + * errors are handled as channel or connection exceptions; non- + * fatal errors are sent through this method. + */ + protected function alert($args) + { + $reply_code = $args->read_short(); + $reply_text = $args->read_shortstr(); + $details = $args->read_table(); + + array_push($this->alerts,array($reply_code, $reply_text, $details)); + } + + /** + * request a channel close + */ + public function close($reply_code=0, + $reply_text="", + $method_sig=array(0, 0)) + { + $args = new AMQPWriter(); + $args->write_short($reply_code); + $args->write_shortstr($reply_text); + $args->write_short($method_sig[0]); // class_id + $args->write_short($method_sig[1]); // method_id + $this->send_method_frame(array(20, 40), $args); + return $this->wait(array( + "20,41" // Channel.close_ok + )); + } + + + protected function _close($args) + { + $reply_code = $args->read_short(); + $reply_text = $args->read_shortstr(); + $class_id = $args->read_short(); + $method_id = $args->read_short(); + + $this->send_method_frame(array(20, 41)); + $this->do_close(); + + throw new AMQPChannelException($reply_code, $reply_text, + array($class_id, $method_id)); + } + + /** + * confirm a channel close + */ + protected function close_ok($args) + { + $this->do_close(); + } + + /** + * enable/disable flow from peer + */ + public function flow($active) + { + $args = new AMQPWriter(); + $args->write_bit($active); + $this->send_method_frame(array(20, 20), $args); + return $this->wait(array( + "20,21" //Channel.flow_ok + )); + } + + protected function _flow($args) + { + $this->active = $args->read_bit(); + $this->x_flow_ok($this->active); + } + + protected function x_flow_ok($active) + { + $args = new AMQPWriter(); + $args->write_bit($active); + $this->send_method_frame(array(20, 21), $args); + } + + protected function flow_ok($args) + { + return $args->read_bit(); + } + + protected function x_open($out_of_band="") + { + if($this->is_open) + return; + + $args = new AMQPWriter(); + $args->write_shortstr($out_of_band); + $this->send_method_frame(array(20, 10), $args); + return $this->wait(array( + "20,11" //Channel.open_ok + )); + } + + protected function open_ok($args) + { + $this->is_open = true; + if($this->debug) + { + debug_msg("Channel open"); + } + } + + /** + * request an access ticket + */ + public function access_request($realm, $exclusive=false, + $passive=false, $active=false, $write=false, $read=false) + { + $args = new AMQPWriter(); + $args->write_shortstr($realm); + $args->write_bit($exclusive); + $args->write_bit($passive); + $args->write_bit($active); + $args->write_bit($write); + $args->write_bit($read); + $this->send_method_frame(array(30, 10), $args); + return $this->wait(array( + "30,11" //Channel.access_request_ok + )); + } + + /** + * grant access to server resources + */ + protected function access_request_ok($args) + { + $this->default_ticket = $args->read_short(); + return $this->default_ticket; + } + + + /** + * declare exchange, create if needed + */ + public function exchange_declare($exchange, + $type, + $passive=false, + $durable=false, + $auto_delete=true, + $internal=false, + $nowait=false, + $arguments=NULL, + $ticket=NULL) + { + if($arguments==NULL) + $arguments = array(); + + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + $args->write_shortstr($exchange); + $args->write_shortstr($type); + $args->write_bit($passive); + $args->write_bit($durable); + $args->write_bit($auto_delete); + $args->write_bit($internal); + $args->write_bit($nowait); + $args->write_table($arguments); + $this->send_method_frame(array(40, 10), $args); + + if(!$nowait) + return $this->wait(array( + "40,11" //Channel.exchange_declare_ok + )); + } + + /** + * confirms an exchange declaration + */ + protected function exchange_declare_ok($args) + { + } + + /** + * delete an exchange + */ + public function exchange_delete($exchange, $if_unused=false, + $nowait=false, $ticket=NULL) + { + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + $args->write_shortstr($exchange); + $args->write_bit($if_unused); + $args->write_bit($nowait); + $this->send_method_frame(array(40, 20), $args); + + if(!$nowait) + return $this->wait(array( + "40,21" //Channel.exchange_delete_ok + )); + } + + /** + * confirm deletion of an exchange + */ + protected function exchange_delete_ok($args) + { + } + + + /** + * bind queue to an exchange + */ + public function queue_bind($queue, $exchange, $routing_key="", + $nowait=false, $arguments=NULL, $ticket=NULL) + { + if($arguments == NULL) + $arguments = array(); + + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + $args->write_shortstr($queue); + $args->write_shortstr($exchange); + $args->write_shortstr($routing_key); + $args->write_bit($nowait); + $args->write_table($arguments); + $this->send_method_frame(array(50, 20), $args); + + if(!$nowait) + return $this->wait(array( + "50,21" // Channel.queue_bind_ok + )); + } + + /** + * confirm bind successful + */ + protected function queue_bind_ok($args) + { + } + + /** + * declare queue, create if needed + */ + public function queue_declare($queue="", + $passive=false, + $durable=false, + $exclusive=false, + $auto_delete=true, + $nowait=false, + $arguments=NULL, + $ticket=NULL) + { + if($arguments == NULL) + $arguments = array(); + + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + $args->write_shortstr($queue); + $args->write_bit($passive); + $args->write_bit($durable); + $args->write_bit($exclusive); + $args->write_bit($auto_delete); + $args->write_bit($nowait); + $args->write_table($arguments); + $this->send_method_frame(array(50, 10), $args); + + if(!$nowait) + return $this->wait(array( + "50,11" // Channel.queue_declare_ok + )); + } + + /** + * confirms a queue definition + */ + protected function queue_declare_ok($args) + { + $queue = $args->read_shortstr(); + $message_count = $args->read_long(); + $consumer_count = $args->read_long(); + + return array($queue, $message_count, $consumer_count); + } + + /** + * delete a queue + */ + public function queue_delete($queue="", $if_unused=false, $if_empty=false, + $nowait=false, $ticket=NULL) + { + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + + $args->write_shortstr($queue); + $args->write_bit($if_unused); + $args->write_bit($if_empty); + $args->write_bit($nowait); + $this->send_method_frame(array(50, 40), $args); + + if(!$nowait) + return $this->wait(array( + "50,41" //Channel.queue_delete_ok + )); + } + + /** + * confirm deletion of a queue + */ + protected function queue_delete_ok($args) + { + return $args->read_long(); + } + + /** + * purge a queue + */ + public function queue_purge($queue="", $nowait=false, $ticket=NULL) + { + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + $args->write_shortstr($queue); + $args->write_bit($nowait); + $this->send_method_frame(array(50, 30), $args); + + if(!$nowait) + return $this->wait(array( + "50,31" //Channel.queue_purge_ok + )); + } + + /** + * confirms a queue purge + */ + protected function queue_purge_ok($args) + { + return $args->read_long(); + } + + /** + * acknowledge one or more messages + */ + public function basic_ack($delivery_tag, $multiple=false) + { + $args = new AMQPWriter(); + $args->write_longlong($delivery_tag); + $args->write_bit($multiple); + $this->send_method_frame(array(60, 80), $args); + } + + /** + * end a queue consumer + */ + public function basic_cancel($consumer_tag, $nowait=false) + { + $args = new AMQPWriter(); + $args->write_shortstr($consumer_tag); + $args->write_bit($nowait); + $this->send_method_frame(array(60, 30), $args); + return $this->wait(array( + "60,31" // Channel.basic_cancel_ok + )); + } + + /** + * confirm a cancelled consumer + */ + protected function basic_cancel_ok($args) + { + $consumer_tag = $args->read_shortstr(); + unset($this->callbacks[$consumer_tag]); + } + + /** + * start a queue consumer + */ + public function basic_consume($queue="", $consumer_tag="", $no_local=false, + $no_ack=false, $exclusive=false, $nowait=false, + $callback=NULL, $ticket=NULL) + { + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + $args->write_shortstr($queue); + $args->write_shortstr($consumer_tag); + $args->write_bit($no_local); + $args->write_bit($no_ack); + $args->write_bit($exclusive); + $args->write_bit($nowait); + $this->send_method_frame(array(60, 20), $args); + + if(!$nowait) + $consumer_tag = $this->wait(array( + "60,21" //Channel.basic_consume_ok + )); + + $this->callbacks[$consumer_tag] = $callback; + return $consumer_tag; + } + + /** + * confirm a new consumer + */ + protected function basic_consume_ok($args) + { + return $args->read_shortstr(); + } + + /** + * notify the client of a consumer message + */ + protected function basic_deliver($args, $msg) + { + $consumer_tag = $args->read_shortstr(); + $delivery_tag = $args->read_longlong(); + $redelivered = $args->read_bit(); + $exchange = $args->read_shortstr(); + $routing_key = $args->read_shortstr(); + + $msg->delivery_info = array( + "channel" => $this, + "consumer_tag" => $consumer_tag, + "delivery_tag" => $delivery_tag, + "redelivered" => $redelivered, + "exchange" => $exchange, + "routing_key" => $routing_key + ); + + if(array_key_exists($consumer_tag, $this->callbacks)) + $func = $this->callbacks[$consumer_tag]; + else + $func = NULL; + + if($func!=NULL) + call_user_func($func, $msg); + } + + /** + * direct access to a queue + */ + public function basic_get($queue="", $no_ack=false, $ticket=NULL) + { + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + $args->write_shortstr($queue); + $args->write_bit($no_ack); + $this->send_method_frame(array(60, 70), $args); + return $this->wait(array( + "60,71", //Channel.basic_get_ok + "60,72" // Channel.basic_get_empty + )); + } + + /** + * indicate no messages available + */ + protected function basic_get_empty($args) + { + $cluster_id = $args->read_shortstr(); + } + + /** + * provide client with a message + */ + protected function basic_get_ok($args, $msg) + { + $delivery_tag = $args->read_longlong(); + $redelivered = $args->read_bit(); + $exchange = $args->read_shortstr(); + $routing_key = $args->read_shortstr(); + $message_count = $args->read_long(); + + $msg->delivery_info = array( + "delivery_tag" => $delivery_tag, + "redelivered" => $redelivered, + "exchange" => $exchange, + "routing_key" => $routing_key, + "message_count" => $message_count + ); + return $msg; + } + + /** + * publish a message + */ + public function basic_publish($msg, $exchange="", $routing_key="", + $mandatory=false, $immediate=false, + $ticket=NULL) + { + $args = new AMQPWriter(); + if($ticket != NULL) + $args->write_short($ticket); + else + $args->write_short($this->default_ticket); + $args->write_shortstr($exchange); + $args->write_shortstr($routing_key); + $args->write_bit($mandatory); + $args->write_bit($immediate); + $this->send_method_frame(array(60, 40), $args); + + $this->connection->send_content($this->channel_id, 60, 0, + strlen($msg->body), + $msg->serialize_properties(), + $msg->body); + } + + + /** + * specify quality of service + */ + public function basic_qos($prefetch_size, $prefetch_count, $a_global) + { + $args = new AMQPWriter(); + $args->write_long($prefetch_size); + $args->write_short($prefetch_count); + $args->write_bit($a_global); + $this->send_method_frame(array(60, 10), $args); + return $this->wait(array( + "60,11" //Channel.basic_qos_ok + )); + } + + + /** + * confirm the requested qos + */ + protected function basic_qos_ok($args) + { + } + + /** + * redeliver unacknowledged messages + */ + public function basic_recover($requeue=false) + { + $args = new AMQPWriter(); + $args->write_bit($requeue); + $this->send_method_frame(array(60, 100), $args); + } + + /** + * reject an incoming message + */ + public function basic_reject($delivery_tag, $requeue) + { + $args = new AMQPWriter(); + $args->write_longlong($delivery_tag); + $args->write_bit($requeue); + $this->send_method_frame(array(60, 90), $args); + } + + /** + * return a failed message + */ + protected function basic_return($args) + { + $reply_code = $args->read_short(); + $reply_text = $args->read_shortstr(); + $exchange = $args->read_shortstr(); + $routing_key = $args->read_shortstr(); + $msg = $this->wait(); + } + + /** + * confirm a successful commit + */ + protected function tx_commit_ok($args) + { + } + + + /** + * abandon the current transaction + */ + public function tx_rollback() + { + $this->send_method_frame(array(90, 30)); + return $this->wait(array( + "90,31" //Channel.tx_rollback_ok + )); + } + + /** + * confirm a successful rollback + */ + protected function tx_rollback_ok($args) + { + } + + /** + * select standard transaction mode + */ + public function tx_select() + { + $this->send_method_frame(array(90, 10)); + return $this->wait(array( + "90,11" //Channel.tx_select_ok + )); + } + + /** + * confirm transaction mode + */ + protected function tx_select_ok($args) + { + } + +} + +/** + * A Message for use with the Channnel.basic_* methods. + */ +class AMQPMessage extends GenericContent +{ + protected static $PROPERTIES = array( + "content_type" => "shortstr", + "content_encoding" => "shortstr", + "application_headers" => "table", + "delivery_mode" => "octet", + "priority" => "octet", + "correlation_id" => "shortstr", + "reply_to" => "shortstr", + "expiration" => "shortstr", + "message_id" => "shortstr", + "timestamp" => "timestamp", + "type" => "shortstr", + "user_id" => "shortstr", + "app_id" => "shortstr", + "cluster_id" => "shortst" + ); + + public function __construct($body = '', $properties = null) + { + $this->body = $body; + + parent::__construct($properties, $prop_types=AMQPMessage::$PROPERTIES); + } +} + +?> diff --git a/amqp_test.php b/amqp_test.php new file mode 100644 index 0000000..35c1ce5 --- /dev/null +++ b/amqp_test.php @@ -0,0 +1,45 @@ +channel(); + echo "Requesting access\n"; + $ch->access_request('/data', false, false, true, true); + + echo "Declaring exchange\n"; + $ch->exchange_declare($EXCHANGE, 'direct', false, false, false); + echo "Creating message\n"; + $msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain')); + + echo "Publishing message\n"; + $ch->basic_publish($msg, $EXCHANGE, $QUEUE); + + echo "Closing channel\n"; + $ch->close(); + echo "Closing connection\n"; + $conn->close(); + echo "Done.\n"; +} catch (Exception $e) { + echo 'Caught exception: ', $e->getMessage(); + echo "\nTrace:\n" . $e->getTraceAsString(); +} +?> diff --git a/amqp_wire.inc b/amqp_wire.inc new file mode 100644 index 0000000..fcd23ce --- /dev/null +++ b/amqp_wire.inc @@ -0,0 +1,607 @@ + + * + * + * To understand all signed/unsinged and 32/64 bit madness in this + * code, please read first the following article: + * + * http://www.mysqlperformanceblog.com/2007/03/27/integers-in-php-running-with-scissors-and-portability/ + */ + +require_once('hexdump.inc'); + + /** + * AMQP protocol decimal value. + * + * Values are represented as (n,e) pairs. The actual value + * is n * 10^(-e). + * + * From 0.8 spec: Decimal values are + * not intended to support floating point values, but rather + * business values such as currency rates and amounts. The + * 'decimals' octet is not signed. + */ +class AMQPDecimal +{ + public function __construct($n, $e) + { + if($e < 0) + throw new Exception("Decimal exponent value must be unsigned!"); + $this->n = $n; + $this->e = $e; + } + + public function asBCvalue() + { + return bcdiv($n, bcpow(10,$e)); + } +} + +class AMQPWriter +{ + public function __construct() + { + $this->out = ""; + $this->bits = array(); + $this->bitcount = 0; + } + + private static function chrbytesplit($x, $bytes) + { + return array_map('chr', AMQPWriter::bytesplit($x,$bytes)); + } + + /** + * Splits number (could be either int or string) into array of byte + * values (represented as integers) in big-endian byte order. + */ + private static function bytesplit($x, $bytes) + { + if(is_int($x)) + { + if($x<0) + $x = sprintf("%u", $x); + } + + $res = array(); + for($i=0;$i<$bytes;$i++) + { + $b = bcmod($x,'256'); + array_unshift($res,(int)$b); + $x=bcdiv($x,'256'); + } + if($x!=0) + throw new Exception("Value too big!"); + return $res; + } + + private function flushbits() + { + if(count($this->bits)) + { + $this->out .= implode("", array_map('chr',$this->bits)); + $this->bits = array(); + $this->bitcount = 0; + } + } + + /** + * Get what's been encoded so far. + */ + public function getvalue() + { + $this->flushbits(); + return $this->out; + } + + /** + * Write a plain Python string, with no special encoding. + */ + public function write($s) + { + $this->flushbits(); + $this->out .= $s; + } + + /** + * Write a boolean value. + */ + public function write_bit($b) + { + if($b) + $b = 1; + else + $b = 0; + $shift = $this->bitcount % 8; + if($shift == 0) + $last = 0; + else + $last = array_pop($this->bits); + + $last |= ($b << $shift); + array_push($this->bits, $last); + + $this->bitcount += 1; + } + + /** + * Write an integer as an unsigned 8-bit value. + */ + public function write_octet($n) + { + if($n < 0 || $n > 255) + throw new Exception('Octet out of range 0..255'); + $this->flushbits(); + $this->out .= chr($n); + } + + /** + * Write an integer as an unsigned 16-bit value. + */ + public function write_short($n) + { + if($n < 0 || $n > 65535) + throw new Exception('Octet out of range 0..65535'); + $this->flushbits(); + $this->out .= pack('n', $n); + } + + /** + * Write an integer as an unsigned 32-bit value. + */ + public function write_long($n) + { + $this->flushbits(); + $this->out .= implode("", AMQPWriter::chrbytesplit($n,4)); + } + + private function write_signed_long($n) + { + $this->flushbits(); + // although format spec for 'N' mentions unsigned + // it will deal with sinned integers as well. tested. + $this->out .= pack('N', $n); + } + + /** + * Write an integer as an unsigned 64-bit value. + */ + public function write_longlong($n) + { + $this->flushbits(); + $this->out .= implode("", AMQPWriter::chrbytesplit($n,8)); + } + + /** + * Write a string up to 255 bytes long after encoding. + * Assume UTF-8 encoding. + */ + public function write_shortstr($s) + { + $this->flushbits(); + if(strlen($s) > 255) + throw new Exception('String too long'); + $this->write_octet(strlen($s)); + $this->out .= $s; + } + + + /* + * Write a string up to 2**32 bytes long. Assume UTF-8 encoding. + */ + public function write_longstr($s) + { + $this->flushbits(); + $this->write_long(strlen($s)); + $this->out .= $s; + } + + + /** + * Write unix time_t value as 64 bit timestamp. + */ + public function write_timestamp($v) + { + $this->write_longlong($v); + } + + /** + * Write PHP array, as table. Input array format: keys are strings, + * values are (type,value) tuples. + */ + public function write_table($d) + { + $this->flushbits(); + $table_data = new AMQPWriter(); + foreach($d as $k=>$va) + { + list($ftype,$v) = $va; + $table_data->write_shortstr($k); + if($ftype=='S') + { + $table_data->write('S'); + $table_data->write_longstr($v); + } else if($ftype=='I') + { + $table_data->write('I'); + $table_data->write_signed_long($v); + } else if($ftype=='D') + { + // 'D' type values are passed AMQPDecimal instances. + $table_data->write('D'); + $table_data->write_octet($v->e); + $table_data->write_signed_long($v->n); + } else if($ftype=='T') + { + $table_data->write('T'); + $table_data->write_timestamp($v); + } else if($ftype='F') + { + $table_data->write('F'); + $table_data->write_table($v); + } + } + $table_data = $table_data->getvalue(); + $this->write_long(strlen($table_data)); + $this->write($table_data); + } +} + +class AMQPReader +{ + public function __construct($str, $sock=NULL) + { + $this->str = $str; + $this->sock = $sock; + $this->offset = 0; + + $this->bitcount = $this->bits = 0; + + if(((int)4294967296)!=0) + $this->is64bits = true; + else + $this->is64bits = false; + + if(!function_exists("bcmul")) + throw new Exception("'bc math' module required"); + } + + public function close() + { + if($this->sock) + fclose($this->sock); + } + + public function read($n) + { + $this->bitcount = $this->bits = 0; + return $this->rawread($n); + } + + private function rawread($n) + { + if($this->sock) + { + $res = ''; + $read = 0; + + while ($read < $n && (false !== ($buf = fread($this->sock, $n - $read)))) + { + $read += strlen($buf); + $res .= $buf; + } + + if(strlen($res)!=$n) + throw new Exception ("Error reading data. Recevived " . + strlen($res) . " instead of expected $n bytes"); + $this->offset += $n; + } else + { + if(strlen($this->str) < $n) + throw new Exception ("Error reading data. Requested $n bytes while string buffer has only " . + strlen($this->str)); + $res = substr($this->str,0,$n); + $this->str = substr($this->str,$n); + $this->offset += $n; + } + return $res; + } + + public function read_bit() + { + if(!$this->bitcount) + { + $this->bits = ord($this->rawread(1)); + $this->bitcount = 8; + } + $result = ($this->bits & 1) == 1; + $this->bits >>= 1; + $this->bitcount -= 1; + return $result; + } + + public function read_octet() + { + $this->bitcount = $this->bits = 0; + list(,$res) = unpack('C', $this->rawread(1)); + return $res; + } + + public function read_short() + { + $this->bitcount = $this->bits = 0; + list(,$res) = unpack('n', $this->rawread(2)); + return $res; + } + + /** + * Reads 32 bit integer in big-endian byte order. + * + * On 64 bit systems it will return always usngined int + * value in 0..2^32 range. + * + * On 32 bit systems it will return signed int value in + * -2^31...+2^31 range. + * + * Use with caution! + */ + public function read_php_int() + { + list(,$res) = unpack('N', $this->rawread(4)); + if($this->is64bits) + { + $sres = sprintf ( "%u", $res ); + return (int)$sres; + } else { + return $res; + } + } + + // PHP does not have unsigned 32 bit int, + // so we return it as a string + public function read_long() + { + $this->bitcount = $this->bits = 0; + list(,$res) = unpack('N', $this->rawread(4)); + $sres = sprintf ( "%u", $res ); + return $sres; + } + + private function read_signed_long() + { + $this->bitcount = $this->bits = 0; + // In PHP unpack('N') always return signed value, + // on both 32 and 64 bit systems! + list(,$res) = unpack('N', $this->rawread(4)); + return $res; + } + + // Even on 64 bit systems PHP integers are singed. + // Since we need an unsigned value here we return it + // as a string. + public function read_longlong() + { + $this->bitcount = $this->bits = 0; + $hi = unpack('N', $this->rawread(4)); + $lo = unpack('N', $this->rawread(4)); + + // workaround signed/unsigned braindamage in php + $hi = sprintf ( "%u", $hi[1] ); + $lo = sprintf ( "%u", $lo[1] ); + + return bcadd(bcmul($hi, "4294967296" ), $lo); + } + + /** + * Read a utf-8 encoded string that's stored in up to + * 255 bytes. Return it decoded as a Python unicode object. + */ + public function read_shortstr() + { + $this->bitcount = $this->bits = 0; + list(,$slen) = unpack('C', $this->rawread(1)); + return $this->rawread($slen); + } + + /** + * Read a string that's up to 2**32 bytes, the encoding + * isn't specified in the AMQP spec, so just return it as + * a plain PHP string. + */ + public function read_longstr() + { + $this->bitcount = $this->bits = 0; + $slen = $this->read_php_int(); + if($slen<0) + throw new Exception("Strings longer than supported on this platform"); + return $this->rawread($slen); + } + + /** + * Read and AMQP timestamp, which is a 64-bit integer representing + * seconds since the Unix epoch in 1-second resolution. + */ + function read_timestamp() + { + return $this->read_longlong(); + } + + /** + * Read an AMQP table, and return as a PHP array. keys are strings, + * values are (type,value) tuples. + */ + public function read_table() + { + $this->bitcount = $this->bits = 0; + $tlen = $this->read_php_int(); + if($tlen<0) + throw new Exception("Table is longer than supported"); + $table_data = new AMQPReader($this->rawread($tlen)); + $result = array(); + while($table_data->tell() < $tlen) + { + $name = $table_data->read_shortstr(); + $ftype = $table_data->rawread(1); + if($ftype == 'S') { + $val = $table_data->read_longstr(); + } else if(ftype == 'I') { + $val = $table_data->read_signed_long(); + } else if($ftype == 'D') + { + $e = $table_data->read_octet(); + $n = $table_data->read_signed_long(); + $val = new AMQPDecimal($n, $e); + } else if($ftype == 'T') + { + $val = $table_data->read_timestamp(); + } else if($ftype == 'F') + { + $val = $table_data->read_table(); // recursion + } else { + error_log("Usupported table field type $ftype"); + $val = NULL; + } + $result[$name] = array($ftype,$val); + } + return $result; + } + + + protected function tell() + { + return $this->offset; + } + +} + + +/** + * Abstract base class for AMQP content. Subclasses should override + * the PROPERTIES attribute. + */ +class GenericContent +{ + protected static $PROPERTIES = array( + "dummy" => "shortstr" + ); + + public function __construct($props, $prop_types=NULL) + { + if($prop_types) + $this->prop_types = $prop_types; + else + $this->prop_types = GenericContent::$PROPERTIES; + $d = array(); + if ($props) + $d = array_intersect_key($props, $this->prop_types); + else + $d = array(); + $this->properties = $d; + } + + + /** + * Look for additional properties in the 'properties' dictionary, + * and if present - the 'delivery_info' dictionary. + */ + public function get($name) + { + if(array_key_exists($name,$this->properties)) + return $this->properties[$name]; + + if(isset($this->delivery_info)) + if(array_key_exists($name,$this->delivery_info)) + return $this->delivery_info[$name]; + + throw new Exception("No such property"); + } + + + /** + * Given the raw bytes containing the property-flags and + * property-list from a content-frame-header, parse and insert + * into a dictionary stored in this object as an attribute named + * 'properties'. + */ + public function load_properties($raw_bytes) + { + $r = new AMQPReader($raw_bytes); + + // Read 16-bit shorts until we get one with a low bit set to zero + $flags = array(); + while(true) + { + $flag_bits = $r->read_short(); + array_push($flags, $flag_bits); + if(($flag_bits & 1) == 0) + break; + } + + $shift = 0; + $d = array(); + foreach ($this->prop_types as $key => $proptype) + { + if($shift == 0) { + if(!$flags) { + break; + } + $flag_bits = array_shift($flags); + $shift = 15; + } + if($flag_bits & (1 << $shift)) + $d[$key] = call_user_func(array($r,"read_".$proptype)); + $shift -= 1; + } + $this->properties = $d; + } + + + /** + * serialize the 'properties' attribute (a dictionary) into the + * raw bytes making up a set of property flags and a property + * list, suitable for putting into a content frame header. + */ + public function serialize_properties() + { + $shift = 15; + $flag_bits = 0; + $flags = array(); + $raw_bytes = new AMQPWriter(); + foreach ($this->prop_types as $key => $proptype) + { + if(array_key_exists($key,$this->properties)) + $val = $this->properties[$key]; + else + $val = NULL; + if($val != NULL) + { + if($shift == 0) + { + array_push($flags, $flag_bits); + $flag_bits = 0; + $shift = 15; + } + + $flag_bits |= (1 << $shift); + if($proptype != "bit") + call_user_func(array($raw_bytes, "write_" . $proptype), + $val); + } + $shift -= 1; + } + array_push($flags, $flag_bits); + $result = new AMQPWriter(); + foreach($flags as $flag_bits) + $result->write_short($flag_bits); + $result->write($raw_bytes->getvalue()); + + return $result->getvalue(); + } +} + +?> diff --git a/demo/amqp_consumer.php b/demo/amqp_consumer.php new file mode 100644 index 0000000..2e10f34 --- /dev/null +++ b/demo/amqp_consumer.php @@ -0,0 +1,53 @@ +#!/usr/bin/php + + */ + +require_once('../amqp.inc'); + +$HOST = 'localhost'; +$PORT = 5672; +$USER = 'guest'; +$PASS = 'guest'; +$VHOST = '/'; +$EXCHANGE = 'router'; +$QUEUE = 'msgs'; +$CONSUMER_TAG = 'consumer'; + +$conn = new AMQPConnection($HOST, $PORT, $USER, $PASS); +$ch = $conn->channel(); +$ch->access_request($VHOST, false, false, true, true); + +$ch->queue_declare($QUEUE); +$ch->exchange_declare($EXCHANGE, 'direct', false, false, false); +$ch->queue_bind($QUEUE, $EXCHANGE); + +function process_message($msg) { + global $ch, $CONSUMER_TAG; + + echo "\n--------\n"; + echo $msg->body; + echo "\n--------\n"; + + $ch->basic_ack($msg->delivery_info['delivery_tag']); + + // Cancel callback + if ($msg->body === 'quit') { + $ch->basic_cancel($CONSUMER_TAG); + } +} + +$ch->basic_consume($QUEUE, $CONSUMER_TAG, false, false, false, false, 'process_message'); + +// Loop as long as the channel has callbacks registered +while(count($ch->callbacks)) { + $ch->wait(); +} + +$ch->close(); +$conn->close(); +?> diff --git a/demo/amqp_publisher.php b/demo/amqp_publisher.php new file mode 100644 index 0000000..00a9999 --- /dev/null +++ b/demo/amqp_publisher.php @@ -0,0 +1,29 @@ +#!/usr/bin/php + + */ + +require_once('../amqp.inc'); + +$HOST = 'localhost'; +$PORT = 5672; +$USER = 'guest'; +$PASS = 'guest'; +$VHOST = '/'; +$EXCHANGE = 'router'; +$QUEUE = 'msgs'; + +$conn = new AMQPConnection($HOST, $PORT, $USER, $PASS); +$ch = $conn->channel(); +$ch->access_request($VHOST, false, false, true, true); + +$msg_body = implode(' ', array_slice($argv, 1)); +$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain')); +$ch->basic_publish($msg, $EXCHANGE); + +$ch->close(); +$conn->close(); +?> diff --git a/hexdump.inc b/hexdump.inc new file mode 100644 index 0000000..8304649 --- /dev/null +++ b/hexdump.inc @@ -0,0 +1,82 @@ + + * @author Peter Waller + * @link http://aidanlister.com/repos/v/function.hexdump.php + * @param string $data The string to be dumped + * @param bool $htmloutput Set to false for non-HTML output + * @param bool $uppercase Set to true for uppercase hex + * @param bool $return Set to true to return the dump + */ +function hexdump ($data, $htmloutput = true, $uppercase = false, $return = false) +{ + // Init + $hexi = ''; + $ascii = ''; + $dump = ($htmloutput === true) ? '
' : '';
+    $offset = 0;
+    $len    = strlen($data);
+
+    // Upper or lower case hexidecimal
+    $x = ($uppercase === false) ? 'x' : 'X';
+
+    // Iterate string
+    for ($i = $j = 0; $i < $len; $i++)
+    {
+        // Convert to hexidecimal
+        $hexi .= sprintf("%02$x ", ord($data[$i]));
+
+        // Replace non-viewable bytes with '.'
+        if (ord($data[$i]) >= 32) {
+            $ascii .= ($htmloutput === true) ?
+                            htmlentities($data[$i]) :
+                            $data[$i];
+        } else {
+            $ascii .= '.';
+        }
+
+        // Add extra column spacing
+        if ($j === 7) {
+            $hexi  .= ' ';
+            $ascii .= ' ';
+        }
+
+        // Add row
+        if (++$j === 16 || $i === $len - 1) {
+            // Join the hexi / ascii output
+            $dump .= sprintf("%04$x  %-49s  %s", $offset, $hexi, $ascii);
+            
+            // Reset vars
+            $hexi   = $ascii = '';
+            $offset += 16;
+            $j      = 0;
+            
+            // Add newline            
+            if ($i !== $len - 1) {
+                $dump .= "\n";
+            }
+        }
+    }
+
+    // Finish dump
+    $dump .= $htmloutput === true ?
+                '
' : + ''; + $dump .= "\n"; + + // Output method + if ($return === false) { + echo $dump; + } else { + return $dump; + } +} + +?> \ No newline at end of file