Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial revision

  • Loading branch information...
commit d514dce7daf10cfc727cca9095ef0d114f9e4592 0 parents
@tonyg tonyg authored
3  .hgignore
@@ -0,0 +1,3 @@
+\.pyc$
+^rabbitmq/spec.py$
+^MANIFEST$
5 COPYING
@@ -0,0 +1,5 @@
+This package, the RabbitMQ C client, is licensed under the MPL. For
+the MPL, please see LICENSE-MPL-RabbitMQ.
+
+If you have any questions regarding licensing, please contact us at
+info@rabbitmq.com.
473 LICENSE-MPL-RabbitMQ
@@ -0,0 +1,473 @@
+ MOZILLA PUBLIC LICENSE
+ Version 1.1
+
+ ---------------
+
+1. Definitions.
+
+ 1.0.1. "Commercial Use" means distribution or otherwise making the
+ Covered Code available to a third party.
+
+ 1.1. "Contributor" means each entity that creates or contributes to
+ the creation of Modifications.
+
+ 1.2. "Contributor Version" means the combination of the Original
+ Code, prior Modifications used by a Contributor, and the Modifications
+ made by that particular Contributor.
+
+ 1.3. "Covered Code" means the Original Code or Modifications or the
+ combination of the Original Code and Modifications, in each case
+ including portions thereof.
+
+ 1.4. "Electronic Distribution Mechanism" means a mechanism generally
+ accepted in the software development community for the electronic
+ transfer of data.
+
+ 1.5. "Executable" means Covered Code in any form other than Source
+ Code.
+
+ 1.6. "Initial Developer" means the individual or entity identified
+ as the Initial Developer in the Source Code notice required by Exhibit
+ A.
+
+ 1.7. "Larger Work" means a work which combines Covered Code or
+ portions thereof with code not governed by the terms of this License.
+
+ 1.8. "License" means this document.
+
+ 1.8.1. "Licensable" means having the right to grant, to the maximum
+ extent possible, whether at the time of the initial grant or
+ subsequently acquired, any and all of the rights conveyed herein.
+
+ 1.9. "Modifications" means any addition to or deletion from the
+ substance or structure of either the Original Code or any previous
+ Modifications. When Covered Code is released as a series of files, a
+ Modification is:
+ A. Any addition to or deletion from the contents of a file
+ containing Original Code or previous Modifications.
+
+ B. Any new file that contains any part of the Original Code or
+ previous Modifications.
+
+ 1.10. "Original Code" means Source Code of computer software code
+ which is described in the Source Code notice required by Exhibit A as
+ Original Code, and which, at the time of its release under this
+ License is not already Covered Code governed by this License.
+
+ 1.10.1. "Patent Claims" means any patent claim(s), now owned or
+ hereafter acquired, including without limitation, method, process,
+ and apparatus claims, in any patent Licensable by grantor.
+
+ 1.11. "Source Code" means the preferred form of the Covered Code for
+ making modifications to it, including all modules it contains, plus
+ any associated interface definition files, scripts used to control
+ compilation and installation of an Executable, or source code
+ differential comparisons against either the Original Code or another
+ well known, available Covered Code of the Contributor's choice. The
+ Source Code can be in a compressed or archival form, provided the
+ appropriate decompression or de-archiving software is widely available
+ for no charge.
+
+ 1.12. "You" (or "Your") means an individual or a legal entity
+ exercising rights under, and complying with all of the terms of, this
+ License or a future version of this License issued under Section 6.1.
+ For legal entities, "You" includes any entity which controls, is
+ controlled by, or is under common control with You. For purposes of
+ this definition, "control" means (a) the power, direct or indirect,
+ to cause the direction or management of such entity, whether by
+ contract or otherwise, or (b) ownership of more than fifty percent
+ (50%) of the outstanding shares or beneficial ownership of such
+ entity.
+
+2. Source Code License.
+
+ 2.1. The Initial Developer Grant.
+ The Initial Developer hereby grants You a world-wide, royalty-free,
+ non-exclusive license, subject to third party intellectual property
+ claims:
+ (a) under intellectual property rights (other than patent or
+ trademark) Licensable by Initial Developer to use, reproduce,
+ modify, display, perform, sublicense and distribute the Original
+ Code (or portions thereof) with or without Modifications, and/or
+ as part of a Larger Work; and
+
+ (b) under Patents Claims infringed by the making, using or
+ selling of Original Code, to make, have made, use, practice,
+ sell, and offer for sale, and/or otherwise dispose of the
+ Original Code (or portions thereof).
+
+ (c) the licenses granted in this Section 2.1(a) and (b) are
+ effective on the date Initial Developer first distributes
+ Original Code under the terms of this License.
+
+ (d) Notwithstanding Section 2.1(b) above, no patent license is
+ granted: 1) for code that You delete from the Original Code; 2)
+ separate from the Original Code; or 3) for infringements caused
+ by: i) the modification of the Original Code or ii) the
+ combination of the Original Code with other software or devices.
+
+ 2.2. Contributor Grant.
+ Subject to third party intellectual property claims, each Contributor
+ hereby grants You a world-wide, royalty-free, non-exclusive license
+
+ (a) under intellectual property rights (other than patent or
+ trademark) Licensable by Contributor, to use, reproduce, modify,
+ display, perform, sublicense and distribute the Modifications
+ created by such Contributor (or portions thereof) either on an
+ unmodified basis, with other Modifications, as Covered Code
+ and/or as part of a Larger Work; and
+
+ (b) under Patent Claims infringed by the making, using, or
+ selling of Modifications made by that Contributor either alone
+ and/or in combination with its Contributor Version (or portions
+ of such combination), to make, use, sell, offer for sale, have
+ made, and/or otherwise dispose of: 1) Modifications made by that
+ Contributor (or portions thereof); and 2) the combination of
+ Modifications made by that Contributor with its Contributor
+ Version (or portions of such combination).
+
+ (c) the licenses granted in Sections 2.2(a) and 2.2(b) are
+ effective on the date Contributor first makes Commercial Use of
+ the Covered Code.
+
+ (d) Notwithstanding Section 2.2(b) above, no patent license is
+ granted: 1) for any code that Contributor has deleted from the
+ Contributor Version; 2) separate from the Contributor Version;
+ 3) for infringements caused by: i) third party modifications of
+ Contributor Version or ii) the combination of Modifications made
+ by that Contributor with other software (except as part of the
+ Contributor Version) or other devices; or 4) under Patent Claims
+ infringed by Covered Code in the absence of Modifications made by
+ that Contributor.
+
+3. Distribution Obligations.
+
+ 3.1. Application of License.
+ The Modifications which You create or to which You contribute are
+ governed by the terms of this License, including without limitation
+ Section 2.2. The Source Code version of Covered Code may be
+ distributed only under the terms of this License or a future version
+ of this License released under Section 6.1, and You must include a
+ copy of this License with every copy of the Source Code You
+ distribute. You may not offer or impose any terms on any Source Code
+ version that alters or restricts the applicable version of this
+ License or the recipients' rights hereunder. However, You may include
+ an additional document offering the additional rights described in
+ Section 3.5.
+
+ 3.2. Availability of Source Code.
+ Any Modification which You create or to which You contribute must be
+ made available in Source Code form under the terms of this License
+ either on the same media as an Executable version or via an accepted
+ Electronic Distribution Mechanism to anyone to whom you made an
+ Executable version available; and if made available via Electronic
+ Distribution Mechanism, must remain available for at least twelve (12)
+ months after the date it initially became available, or at least six
+ (6) months after a subsequent version of that particular Modification
+ has been made available to such recipients. You are responsible for
+ ensuring that the Source Code version remains available even if the
+ Electronic Distribution Mechanism is maintained by a third party.
+
+ 3.3. Description of Modifications.
+ You must cause all Covered Code to which You contribute to contain a
+ file documenting the changes You made to create that Covered Code and
+ the date of any change. You must include a prominent statement that
+ the Modification is derived, directly or indirectly, from Original
+ Code provided by the Initial Developer and including the name of the
+ Initial Developer in (a) the Source Code, and (b) in any notice in an
+ Executable version or related documentation in which You describe the
+ origin or ownership of the Covered Code.
+
+ 3.4. Intellectual Property Matters
+ (a) Third Party Claims.
+ If Contributor has knowledge that a license under a third party's
+ intellectual property rights is required to exercise the rights
+ granted by such Contributor under Sections 2.1 or 2.2,
+ Contributor must include a text file with the Source Code
+ distribution titled "LEGAL" which describes the claim and the
+ party making the claim in sufficient detail that a recipient will
+ know whom to contact. If Contributor obtains such knowledge after
+ the Modification is made available as described in Section 3.2,
+ Contributor shall promptly modify the LEGAL file in all copies
+ Contributor makes available thereafter and shall take other steps
+ (such as notifying appropriate mailing lists or newsgroups)
+ reasonably calculated to inform those who received the Covered
+ Code that new knowledge has been obtained.
+
+ (b) Contributor APIs.
+ If Contributor's Modifications include an application programming
+ interface and Contributor has knowledge of patent licenses which
+ are reasonably necessary to implement that API, Contributor must
+ also include this information in the LEGAL file.
+
+ (c) Representations.
+ Contributor represents that, except as disclosed pursuant to
+ Section 3.4(a) above, Contributor believes that Contributor's
+ Modifications are Contributor's original creation(s) and/or
+ Contributor has sufficient rights to grant the rights conveyed by
+ this License.
+
+ 3.5. Required Notices.
+ You must duplicate the notice in Exhibit A in each file of the Source
+ Code. If it is not possible to put such notice in a particular Source
+ Code file due to its structure, then You must include such notice in a
+ location (such as a relevant directory) where a user would be likely
+ to look for such a notice. If You created one or more Modification(s)
+ You may add your name as a Contributor to the notice described in
+ Exhibit A. You must also duplicate this License in any documentation
+ for the Source Code where You describe recipients' rights or ownership
+ rights relating to Covered Code. You may choose to offer, and to
+ charge a fee for, warranty, support, indemnity or liability
+ obligations to one or more recipients of Covered Code. However, You
+ may do so only on Your own behalf, and not on behalf of the Initial
+ Developer or any Contributor. You must make it absolutely clear than
+ any such warranty, support, indemnity or liability obligation is
+ offered by You alone, and You hereby agree to indemnify the Initial
+ Developer and every Contributor for any liability incurred by the
+ Initial Developer or such Contributor as a result of warranty,
+ support, indemnity or liability terms You offer.
+
+ 3.6. Distribution of Executable Versions.
+ You may distribute Covered Code in Executable form only if the
+ requirements of Section 3.1-3.5 have been met for that Covered Code,
+ and if You include a notice stating that the Source Code version of
+ the Covered Code is available under the terms of this License,
+ including a description of how and where You have fulfilled the
+ obligations of Section 3.2. The notice must be conspicuously included
+ in any notice in an Executable version, related documentation or
+ collateral in which You describe recipients' rights relating to the
+ Covered Code. You may distribute the Executable version of Covered
+ Code or ownership rights under a license of Your choice, which may
+ contain terms different from this License, provided that You are in
+ compliance with the terms of this License and that the license for the
+ Executable version does not attempt to limit or alter the recipient's
+ rights in the Source Code version from the rights set forth in this
+ License. If You distribute the Executable version under a different
+ license You must make it absolutely clear that any terms which differ
+ from this License are offered by You alone, not by the Initial
+ Developer or any Contributor. You hereby agree to indemnify the
+ Initial Developer and every Contributor for any liability incurred by
+ the Initial Developer or such Contributor as a result of any such
+ terms You offer.
+
+ 3.7. Larger Works.
+ You may create a Larger Work by combining Covered Code with other code
+ not governed by the terms of this License and distribute the Larger
+ Work as a single product. In such a case, You must make sure the
+ requirements of this License are fulfilled for the Covered Code.
+
+4. Inability to Comply Due to Statute or Regulation.
+
+ If it is impossible for You to comply with any of the terms of this
+ License with respect to some or all of the Covered Code due to
+ statute, judicial order, or regulation then You must: (a) comply with
+ the terms of this License to the maximum extent possible; and (b)
+ describe the limitations and the code they affect. Such description
+ must be included in the LEGAL file described in Section 3.4 and must
+ be included with all distributions of the Source Code. Except to the
+ extent prohibited by statute or regulation, such description must be
+ sufficiently detailed for a recipient of ordinary skill to be able to
+ understand it.
+
+5. Application of this License.
+
+ This License applies to code to which the Initial Developer has
+ attached the notice in Exhibit A and to related Covered Code.
+
+6. Versions of the License.
+
+ 6.1. New Versions.
+ Netscape Communications Corporation ("Netscape") may publish revised
+ and/or new versions of the License from time to time. Each version
+ will be given a distinguishing version number.
+
+ 6.2. Effect of New Versions.
+ Once Covered Code has been published under a particular version of the
+ License, You may always continue to use it under the terms of that
+ version. You may also choose to use such Covered Code under the terms
+ of any subsequent version of the License published by Netscape. No one
+ other than Netscape has the right to modify the terms applicable to
+ Covered Code created under this License.
+
+ 6.3. Derivative Works.
+ If You create or use a modified version of this License (which you may
+ only do in order to apply it to code which is not already Covered Code
+ governed by this License), You must (a) rename Your license so that
+ the phrases "Mozilla", "MOZILLAPL", "MOZPL", "Netscape",
+ "MPL", "NPL" or any confusingly similar phrase do not appear in your
+ license (except to note that your license differs from this License)
+ and (b) otherwise make it clear that Your version of the license
+ contains terms which differ from the Mozilla Public License and
+ Netscape Public License. (Filling in the name of the Initial
+ Developer, Original Code or Contributor in the notice described in
+ Exhibit A shall not of themselves be deemed to be modifications of
+ this License.)
+
+7. DISCLAIMER OF WARRANTY.
+
+ COVERED CODE IS PROVIDED UNDER THIS LICENSE ON AN "AS IS" BASIS,
+ WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+ WITHOUT LIMITATION, WARRANTIES THAT THE COVERED CODE IS FREE OF
+ DEFECTS, MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE OR NON-INFRINGING.
+ THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE COVERED CODE
+ IS WITH YOU. SHOULD ANY COVERED CODE PROVE DEFECTIVE IN ANY RESPECT,
+ YOU (NOT THE INITIAL DEVELOPER OR ANY OTHER CONTRIBUTOR) ASSUME THE
+ COST OF ANY NECESSARY SERVICING, REPAIR OR CORRECTION. THIS DISCLAIMER
+ OF WARRANTY CONSTITUTES AN ESSENTIAL PART OF THIS LICENSE. NO USE OF
+ ANY COVERED CODE IS AUTHORIZED HEREUNDER EXCEPT UNDER THIS DISCLAIMER.
+
+8. TERMINATION.
+
+ 8.1. This License and the rights granted hereunder will terminate
+ automatically if You fail to comply with terms herein and fail to cure
+ such breach within 30 days of becoming aware of the breach. All
+ sublicenses to the Covered Code which are properly granted shall
+ survive any termination of this License. Provisions which, by their
+ nature, must remain in effect beyond the termination of this License
+ shall survive.
+
+ 8.2. If You initiate litigation by asserting a patent infringement
+ claim (excluding declatory judgment actions) against Initial Developer
+ or a Contributor (the Initial Developer or Contributor against whom
+ You file such action is referred to as "Participant") alleging that:
+
+ (a) such Participant's Contributor Version directly or indirectly
+ infringes any patent, then any and all rights granted by such
+ Participant to You under Sections 2.1 and/or 2.2 of this License
+ shall, upon 60 days notice from Participant terminate prospectively,
+ unless if within 60 days after receipt of notice You either: (i)
+ agree in writing to pay Participant a mutually agreeable reasonable
+ royalty for Your past and future use of Modifications made by such
+ Participant, or (ii) withdraw Your litigation claim with respect to
+ the Contributor Version against such Participant. If within 60 days
+ of notice, a reasonable royalty and payment arrangement are not
+ mutually agreed upon in writing by the parties or the litigation claim
+ is not withdrawn, the rights granted by Participant to You under
+ Sections 2.1 and/or 2.2 automatically terminate at the expiration of
+ the 60 day notice period specified above.
+
+ (b) any software, hardware, or device, other than such Participant's
+ Contributor Version, directly or indirectly infringes any patent, then
+ any rights granted to You by such Participant under Sections 2.1(b)
+ and 2.2(b) are revoked effective as of the date You first made, used,
+ sold, distributed, or had made, Modifications made by that
+ Participant.
+
+ 8.3. If You assert a patent infringement claim against Participant
+ alleging that such Participant's Contributor Version directly or
+ indirectly infringes any patent where such claim is resolved (such as
+ by license or settlement) prior to the initiation of patent
+ infringement litigation, then the reasonable value of the licenses
+ granted by such Participant under Sections 2.1 or 2.2 shall be taken
+ into account in determining the amount or value of any payment or
+ license.
+
+ 8.4. In the event of termination under Sections 8.1 or 8.2 above,
+ all end user license agreements (excluding distributors and resellers)
+ which have been validly granted by You or any distributor hereunder
+ prior to termination shall survive termination.
+
+9. LIMITATION OF LIABILITY.
+
+ UNDER NO CIRCUMSTANCES AND UNDER NO LEGAL THEORY, WHETHER TORT
+ (INCLUDING NEGLIGENCE), CONTRACT, OR OTHERWISE, SHALL YOU, THE INITIAL
+ DEVELOPER, ANY OTHER CONTRIBUTOR, OR ANY DISTRIBUTOR OF COVERED CODE,
+ OR ANY SUPPLIER OF ANY OF SUCH PARTIES, BE LIABLE TO ANY PERSON FOR
+ ANY INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES OF ANY
+ CHARACTER INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF GOODWILL,
+ WORK STOPPAGE, COMPUTER FAILURE OR MALFUNCTION, OR ANY AND ALL OTHER
+ COMMERCIAL DAMAGES OR LOSSES, EVEN IF SUCH PARTY SHALL HAVE BEEN
+ INFORMED OF THE POSSIBILITY OF SUCH DAMAGES. THIS LIMITATION OF
+ LIABILITY SHALL NOT APPLY TO LIABILITY FOR DEATH OR PERSONAL INJURY
+ RESULTING FROM SUCH PARTY'S NEGLIGENCE TO THE EXTENT APPLICABLE LAW
+ PROHIBITS SUCH LIMITATION. SOME JURISDICTIONS DO NOT ALLOW THE
+ EXCLUSION OR LIMITATION OF INCIDENTAL OR CONSEQUENTIAL DAMAGES, SO
+ THIS EXCLUSION AND LIMITATION MAY NOT APPLY TO YOU.
+
+10. U.S. GOVERNMENT END USERS.
+
+ The Covered Code is a "commercial item," as that term is defined in
+ 48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer
+ software" and "commercial computer software documentation," as such
+ terms are used in 48 C.F.R. 12.212 (Sept. 1995). Consistent with 48
+ C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through 227.7202-4 (June 1995),
+ all U.S. Government End Users acquire Covered Code with only those
+ rights set forth herein.
+
+11. MISCELLANEOUS.
+
+ This License represents the complete agreement concerning subject
+ matter hereof. If any provision of this License is held to be
+ unenforceable, such provision shall be reformed only to the extent
+ necessary to make it enforceable. This License shall be governed by
+ California law provisions (except to the extent applicable law, if
+ any, provides otherwise), excluding its conflict-of-law provisions.
+ With respect to disputes in which at least one party is a citizen of,
+ or an entity chartered or registered to do business in the United
+ States of America, any litigation relating to this License shall be
+ subject to the jurisdiction of the Federal Courts of the Northern
+ District of California, with venue lying in Santa Clara County,
+ California, with the losing party responsible for costs, including
+ without limitation, court costs and reasonable attorneys' fees and
+ expenses. The application of the United Nations Convention on
+ Contracts for the International Sale of Goods is expressly excluded.
+ Any law or regulation which provides that the language of a contract
+ shall be construed against the drafter shall not apply to this
+ License.
+
+12. RESPONSIBILITY FOR CLAIMS.
+
+ As between Initial Developer and the Contributors, each party is
+ responsible for claims and damages arising, directly or indirectly,
+ out of its utilization of rights under this License and You agree to
+ work with Initial Developer and Contributors to distribute such
+ responsibility on an equitable basis. Nothing herein is intended or
+ shall be deemed to constitute any admission of liability.
+
+13. MULTIPLE-LICENSED CODE.
+
+ Initial Developer may designate portions of the Covered Code as
+ "Multiple-Licensed". "Multiple-Licensed" means that the Initial
+ Developer permits you to utilize portions of the Covered Code under
+ Your choice of the NPL or the alternative licenses, if any, specified
+ by the Initial Developer in the file described in Exhibit A.
+
+EXHIBIT A -Mozilla Public License.
+
+ ``The contents of this file are subject to the Mozilla Public License
+ Version 1.1 (the "License"); you may not use this file except in
+ compliance with the License. You may obtain a copy of the License at
+ http://www.mozilla.org/MPL/
+
+ Software distributed under the License is distributed on an "AS IS"
+ basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ License for the specific language governing rights and limitations
+ under the License.
+
+ The Original Code is RabbitMQ.
+
+ The Initial Developers of the Original Code are LShift Ltd,
+ Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+
+ Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+ Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+ are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+ Technologies LLC, and Rabbit Technologies Ltd.
+
+ Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ Ltd. Portions created by Cohesive Financial Technologies LLC are
+ Copyright (C) 2007-2009 Cohesive Financial Technologies
+ LLC. Portions created by Rabbit Technologies Ltd are Copyright
+ (C) 2007-2009 Rabbit Technologies Ltd.
+
+ All Rights Reserved.
+
+ Contributor(s): ______________________________________.''
+
+ [NOTE: The text of this Exhibit A may differ slightly from the text of
+ the notices in the Source Code files of the Original Code. You should
+ use the text of this Exhibit A rather than the text found in the
+ Original Code Source Code for Your Modifications.]
+
+
+
19 Makefile
@@ -0,0 +1,19 @@
+SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
+AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
+AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+
+PYTHON=python
+
+all: rabbitmq/spec.py
+
+rabbitmq/spec.py: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
+
+clean:
+ rm -f rabbitmq/spec.py
+ rm -f rabbitmq/*.pyc
+
+codegen:
+ mkdir -p $@
+ cp -r "$(AMQP_CODEGEN_DIR)"/* $@
+ $(MAKE) -C $@ clean
345 codegen.py
@@ -0,0 +1,345 @@
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2009 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+from __future__ import nested_scopes
+
+import sys
+sys.path.append("../rabbitmq-codegen") # in case we're next to an experimental revision
+sys.path.append("codegen") # in case we're building from a distribution package
+
+from amqp_codegen import *
+import string
+import re
+
+DRIVER_METHODS = {
+ "Exchange.Declare": ["Exchange.DeclareOk"],
+ "Exchange.Delete": ["Exchange.DeleteOk"],
+ "Queue.Declare": ["Queue.DeclareOk"],
+ "Queue.Bind": ["Queue.BindOk"],
+ "Queue.Purge": ["Queue.PurgeOk"],
+ "Queue.Delete": ["Queue.DeleteOk"],
+ "Queue.Unbind": ["Queue.UnbindOk"],
+ "Basic.Qos": ["Basic.QosOk"],
+ "Basic.Get": ["Basic.GetOk", "Basic.GetEmpty"],
+ "Basic.Ack": [],
+ "Basic.Reject": [],
+ "Basic.Recover": [],
+ "Tx.Select": ["Tx.SelectOk"],
+ "Tx.Commit": ["Tx.CommitOk"],
+ "Tx.Rollback": ["Tx.RollbackOk"]
+ }
+
+def normalize_separators(s):
+ s = s.replace('-', '_')
+ s = s.replace(' ', '_')
+ return s
+
+def pyize(s):
+ s = normalize_separators(s)
+ if s in ('global', 'class'): s = s + '_'
+ return s
+
+def camel(s):
+ return normalize_separators(s).title().replace('_', '')
+
+AmqpMethod.structName = lambda m: camel(m.klass.name) + '.' + camel(m.name)
+AmqpClass.structName = lambda c: camel(c.name) + "Properties"
+
+def constantName(s):
+ return '_'.join(re.split('[- ]', s.upper()))
+
+def flagName(c, f):
+ if c:
+ return c.structName() + '.' + constantName('flag_' + f.name)
+ else:
+ return constantName('flag_' + f.name)
+
+def gen(spec):
+ def genSingleDecode(prefix, cLvalue, unresolved_domain):
+ type = spec.resolveDomain(unresolved_domain)
+ if type == 'shortstr':
+ print prefix + "length = struct.unpack_from('B', encoded, offset)[0]"
+ print prefix + "offset = offset + 1"
+ print prefix + "%s = encoded[offset : offset + length]" % (cLvalue,)
+ print prefix + "offset = offset + length"
+ elif type == 'longstr':
+ print prefix + "length = struct.unpack_from('>I', encoded, offset)[0]"
+ print prefix + "offset = offset + 4"
+ print prefix + "%s = encoded[offset : offset + length]" % (cLvalue,)
+ print prefix + "offset = offset + length"
+ elif type == 'octet':
+ print prefix + "%s = struct.unpack_from('B', encoded, offset)[0]" % (cLvalue,)
+ print prefix + "offset = offset + 1"
+ elif type == 'short':
+ print prefix + "%s = struct.unpack_from('>H', encoded, offset)[0]" % (cLvalue,)
+ print prefix + "offset = offset + 2"
+ elif type == 'long':
+ print prefix + "%s = struct.unpack_from('>I', encoded, offset)[0]" % (cLvalue,)
+ print prefix + "offset = offset + 4"
+ elif type == 'longlong':
+ print prefix + "%s = struct.unpack_from('>Q', encoded, offset)[0]" % (cLvalue,)
+ print prefix + "offset = offset + 8"
+ elif type == 'timestamp':
+ print prefix + "%s = struct.unpack_from('>Q', encoded, offset)[0]" % (cLvalue,)
+ print prefix + "offset = offset + 8"
+ elif type == 'bit':
+ raise "Can't decode bit in genSingleDecode"
+ elif type == 'table':
+ print prefix + "(%s, offset) = rabbitmq.table.decode_table(encoded, offset)" % \
+ (cLvalue,)
+ else:
+ raise "Illegal domain in genSingleDecode", type
+
+ def genSingleEncode(prefix, cValue, unresolved_domain):
+ type = spec.resolveDomain(unresolved_domain)
+ if type == 'shortstr':
+ print prefix + "pieces.append(struct.pack('B', len(%s)))" % (cValue,)
+ print prefix + "pieces.append(%s)" % (cValue,)
+ elif type == 'longstr':
+ print prefix + "pieces.append(struct.pack('>I', len(%s)))" % (cValue,)
+ print prefix + "pieces.append(%s)" % (cValue,)
+ elif type == 'octet':
+ print prefix + "pieces.append(struct.pack('B', %s))" % (cValue,)
+ elif type == 'short':
+ print prefix + "pieces.append(struct.pack('>H', %s))" % (cValue,)
+ elif type == 'long':
+ print prefix + "pieces.append(struct.pack('>I', %s))" % (cValue,)
+ elif type == 'longlong':
+ print prefix + "pieces.append(struct.pack('>Q', %s))" % (cValue,)
+ elif type == 'timestamp':
+ print prefix + "pieces.append(struct.pack('>Q', %s))" % (cValue,)
+ elif type == 'bit':
+ raise "Can't encode bit in genSingleEncode"
+ elif type == 'table':
+ print prefix + "rabbitmq.table.encode_table(pieces, %s)" % (cValue,)
+ else:
+ raise "Illegal domain in genSingleEncode", type
+
+ def genDecodeMethodFields(m):
+ print " def decode(self, encoded, offset = 0):"
+ bitindex = None
+ for f in m.arguments:
+ if spec.resolveDomain(f.domain) == 'bit':
+ if bitindex is None:
+ bitindex = 0
+ if bitindex >= 8:
+ bitindex = 0
+ if bitindex == 0:
+ print " bit_buffer = struct.unpack_from('B', encoded, offset)[0]"
+ print " offset = offset + 1"
+ print " self.%s = (bit_buffer & (1 << %d)) != 0" % \
+ (pyize(f.name), bitindex)
+ bitindex = bitindex + 1
+ else:
+ bitindex = None
+ genSingleDecode(" ", "self.%s" % (pyize(f.name),), f.domain)
+ print " return self"
+ print
+
+ def genDecodeProperties(c):
+ print " def decode(self, encoded, offset = 0):"
+ print " flags = 0"
+ print " flagword_index = 0"
+ print " while True:"
+ print " partial_flags = struct.unpack_from('>H', encoded, offset)[0]"
+ print " offset = offset + 2"
+ print " flags = flags | (partial_flags << (flagword_index * 16))"
+ print " if (partial_flags & 1) == 0: break"
+ print " flagword_index = flagword_index + 1"
+ for f in c.fields:
+ if spec.resolveDomain(f.domain) == 'bit':
+ print " self.%s = (flags & %s) != 0" % (pyize(f.name), flagName(c, f))
+ else:
+ print " if (flags & %s):" % (flagName(c, f),)
+ genSingleDecode(" ", "self.%s" % (pyize(f.name),), f.domain)
+ print " else:"
+ print " self.%s = None" % (pyize(f.name),)
+ print " return self"
+ print
+
+ def genEncodeMethodFields(m):
+ print " def encode(self):"
+ print " pieces = []"
+ bitindex = None
+ def finishBits():
+ if bitindex is not None:
+ print " pieces.append(struct.pack('B', bit_buffer))"
+ for f in m.arguments:
+ if spec.resolveDomain(f.domain) == 'bit':
+ if bitindex is None:
+ bitindex = 0
+ print " bit_buffer = 0;"
+ if bitindex >= 8:
+ finishBits()
+ print " bit_buffer = 0;"
+ bitindex = 0
+ print " if self.%s: bit_buffer = bit_buffer | (1 << %d)" % \
+ (pyize(f.name), bitindex)
+ bitindex = bitindex + 1
+ else:
+ finishBits()
+ bitindex = None
+ genSingleEncode(" ", "self.%s" % (pyize(f.name),), f.domain)
+ finishBits()
+ print " return pieces"
+ print
+
+ def genEncodeProperties(c):
+ print " def encode(self):"
+ print " pieces = []"
+ print " flags = 0"
+ for f in c.fields:
+ if spec.resolveDomain(f.domain) == 'bit':
+ print " if self.%s: flags = flags | %s" % (pyize(f.name), flagName(c, f))
+ else:
+ print " if self.%s is not None:" % (pyize(f.name),)
+ print " flags = flags | %s" % (flagName(c, f),)
+ genSingleEncode(" ", "self.%s" % (pyize(f.name),), f.domain)
+ print " flag_pieces = []"
+ print " while True:"
+ print " remainder = flags >> 16"
+ print " partial_flags = flags & 0xFFFE"
+ print " if remainder != 0: partial_flags = partial_flags | 1"
+ print " flag_pieces.append(struct.pack('>H', partial_flags))"
+ print " flags = remainder"
+ print " if flags == 0: break"
+ print " return flag_pieces + pieces"
+ print
+
+ def fieldDeclList(fields):
+ return ''.join([", %s = %s" % (pyize(f.name), repr(f.defaultvalue)) for f in fields])
+
+ def fieldInitList(prefix, fields):
+ if fields:
+ return ''.join(["%sself.%s = %s\n" % (prefix, pyize(f.name), pyize(f.name)) \
+ for f in fields])
+ else:
+ return '%spass' % (prefix,)
+
+ print '# Autogenerated code, do not edit'
+ print
+ print 'import struct'
+ print 'import rabbitmq.specbase'
+ print 'import rabbitmq.table'
+ print
+ print "PROTOCOL_VERSION = (%d, %d)" % (spec.major, spec.minor)
+ print "PORT = %d" % (spec.port)
+ print
+
+ for (c,v,cls) in spec.constants:
+ print "%s = %s" % (constantName(c), v)
+ print
+
+ for c in spec.allClasses():
+ print 'class %s(rabbitmq.specbase.Class):' % (camel(c.name),)
+ print " INDEX = 0x%.04X ## %d" % (c.index, c.index)
+ print " NAME = %s" % (repr(camel(c.name)),)
+ print
+
+ for m in c.allMethods():
+ print ' class %s(rabbitmq.specbase.Method):' % (camel(m.name),)
+ methodid = m.klass.index << 16 | m.index
+ print " INDEX = 0x%.08X ## %d, %d; %d" % \
+ (methodid,
+ m.klass.index,
+ m.index,
+ methodid)
+ print " NAME = %s" % (repr(m.structName(),))
+ print " def __init__(self%s):" % (fieldDeclList(m.arguments),)
+ print fieldInitList(' ', m.arguments)
+ genDecodeMethodFields(m)
+ genEncodeMethodFields(m)
+
+ for c in spec.allClasses():
+ if c.fields:
+ print 'class %s(rabbitmq.specbase.Properties):' % (c.structName(),)
+ print " CLASS = %s" % (camel(c.name),)
+ print " INDEX = 0x%.04X ## %d" % (c.index, c.index)
+ print " NAME = %s" % (repr(c.structName(),))
+
+ index = 0
+ if c.fields:
+ for f in c.fields:
+ if index % 16 == 15:
+ index = index + 1
+ shortnum = index / 16
+ partialindex = 15 - (index % 16)
+ bitindex = shortnum * 16 + partialindex
+ print ' %s = (1 << %d)' % (flagName(None, f), bitindex)
+ index = index + 1
+ print
+
+ print " def __init__(self%s):" % (fieldDeclList(c.fields),)
+ print fieldInitList(' ', c.fields)
+ genDecodeProperties(c)
+ genEncodeProperties(c)
+
+ print "methods = {"
+ print ',\n'.join([" 0x%08X: %s" % (m.klass.index << 16 | m.index, m.structName()) \
+ for m in spec.allMethods()])
+ print "}"
+ print
+
+ print "props = {"
+ print ',\n'.join([" 0x%04X: %s" % (c.index, c.structName()) \
+ for c in spec.allClasses() \
+ if c.fields])
+ print "}"
+ print
+
+ print "def has_content(methodNumber):"
+ for m in spec.allMethods():
+ if m.hasContent:
+ print ' if methodNumber == %s.INDEX: return True' % (m.structName())
+ print " return False"
+ print
+
+ print "class DriverMixin:"
+ for m in spec.allMethods():
+ if m.structName() in DRIVER_METHODS:
+ acceptable_replies = DRIVER_METHODS[m.structName()]
+ print " def %s(self%s):" % (pyize(m.klass.name + '_' + m.name),
+ fieldDeclList(m.arguments))
+ print " return self.handler._rpc(%s(%s)," % \
+ (m.structName(), ', '.join(["%s = %s" % (pyize(f.name), pyize(f.name))
+ for f in m.arguments]))
+ print " [%s])" % \
+ (', '.join(acceptable_replies),)
+ print
+
+def generate(specPath):
+ gen(AmqpSpec(specPath))
+
+def dummyGenerate(specPath):
+ pass
+
+if __name__ == "__main__":
+ do_main(dummyGenerate, generate)
39 examples/t.py
@@ -0,0 +1,39 @@
+import rabbitmq
+import asyncore
+
+def t_codec():
+ import rabbitmq.spec
+ import datetime
+
+ p = rabbitmq.BasicProperties(content_type = 'text/plain',
+ delivery_mode = 2,
+ headers = {'hello': 'world',
+ 'time': datetime.datetime.utcnow()})
+ print rabbitmq.BasicProperties()
+ pe = ''.join(p.encode())
+ print pe.encode('hex')
+ print rabbitmq.BasicProperties().decode(pe)
+
+ m = rabbitmq.spec.Connection.Start(server_properties = {"prop1": "hello",
+ "prop2": 123})
+ print m
+ me = ''.join(m.encode())
+ print me.encode('hex')
+ print rabbitmq.spec.Connection.Start().decode(me)
+
+# t_codec()
+
+def handle_delivery(method, header, body):
+ print (method, header, body)
+ ch.basic_ack(delivery_tag = method.delivery_tag)
+ if body == 'quit':
+ ch.basic_cancel(tag)
+ ch.close()
+ c.close()
+
+c = rabbitmq.Connection('127.0.0.1')
+ch = c.channel()
+ch.queue_declare(queue = "test")
+tag = ch.basic_consume(handle_delivery, queue = 'test')
+ch.basic_publish('', "test", "hello!", rabbitmq.BasicProperties(content_type = "text/plain"))
+asyncore.loop()
2  rabbitmq/__init__.py
@@ -0,0 +1,2 @@
+from rabbitmq.spec import BasicProperties
+from rabbitmq.connection import PlainCredentials, Connection
149 rabbitmq/channel.py
@@ -0,0 +1,149 @@
+import asyncore
+import rabbitmq.spec as spec
+import rabbitmq.codec as codec
+
+class ChannelHandler:
+ def __init__(self, connection, channel_number = None):
+ self.connection = connection
+ self.inbound = []
+ self.frame_handler = self._handle_method
+ self.channel_close = None
+ self.async_map = {}
+
+ if channel_number is None:
+ self.channel_number = connection._next_channel_number()
+ else:
+ self.channel_number = channel_number
+ connection._set_channel(self.channel_number, self)
+
+ def _async_channel_close(self, method_frame, header_frame, body):
+ self._set_channel_close(method_frame.method)
+ self.connection.send_method(self.channel_number, spec.Channel.CloseOk())
+
+ def _ensure(self):
+ if self.channel_close:
+ raise ChannelClosed(self.channel_close)
+ return self
+
+ def _set_channel_close(self, c):
+ if not self.channel_close:
+ self.channel_close = c
+ self.connection.reset_channel(self.channel_number)
+
+ def wait_for_reply(self, acceptable_replies):
+ if not acceptable_replies:
+ # One-way.
+ return
+ index = 0
+ while True:
+ self._ensure()
+ while index >= len(self.inbound):
+ asyncore.loop(count = 1)
+ while index < len(self.inbound):
+ frame = self.inbound[index][0]
+ if isinstance(frame, codec.FrameMethod):
+ reply = frame.method
+ if reply.__class__ in acceptable_replies:
+ self.inbound[index:index+1] = []
+ return reply
+ index = index + 1
+
+ def _handle_async(self, method_frame, header_frame, body):
+ method = method_frame.method
+ if method.__class__ in self.async_map:
+ self.async_map[method.__class__](method_frame, header_frame, body)
+ else:
+ self.inbound.append((method_frame, header_frame, body))
+
+ def _handle_method(self, frame):
+ if not isinstance(frame, codec.FrameMethod):
+ raise UnexpectedFrameError(frame)
+ if spec.has_content(frame.method.INDEX):
+ self.frame_handler = self._make_header_handler(frame)
+ else:
+ self._handle_async(frame, None, None)
+
+ def _make_header_handler(self, method_frame):
+ def handler(header_frame):
+ if not isinstance(header_frame, codec.FrameHeader):
+ raise UnexpectedFrameError(header_frame)
+ self.frame_handler = self._make_body_handler(method_frame, header_frame)
+ return handler
+
+ def _make_body_handler(self, method_frame, header_frame):
+ seen_so_far = [0]
+ body_fragments = []
+ def handler(body_frame):
+ if not isinstance(body_frame, codec.FrameBody):
+ raise UnexpectedFrameError(body_frame)
+ fragment = body_frame.fragment
+ seen_so_far[0] = seen_so_far[0] + len(fragment)
+ body_fragments.append(fragment)
+ if seen_so_far[0] == header_frame.body_size:
+ self.frame_handler = self._handle_method
+ self._handle_async(method_frame, header_frame, ''.join(body_fragments))
+ elif seen_so_far[0] > header_frame.body_size:
+ raise BodyTooLongError()
+ else:
+ pass
+ return handler
+
+ def _rpc(self, method, acceptable_replies):
+ return self.connection._rpc(self.channel_number, method, acceptable_replies)
+
+class Channel(spec.DriverMixin):
+ def __init__(self, handler):
+ self.handler = handler
+ self.callbacks = {}
+ self.next_consumer_tag = 0
+
+ handler.async_map[spec.Channel.Close] = handler._async_channel_close
+
+ handler.async_map[spec.Basic.Deliver] = self._async_basic_deliver
+ handler.async_map[spec.Basic.Return] = self._async_basic_return
+ handler.async_map[spec.Channel.Flow] = self._async_channel_flow
+
+ self.handler._rpc(spec.Channel.Open(), [spec.Channel.OpenOk])
+
+ def _async_basic_deliver(self, method_frame, header_frame, body):
+ self.callbacks[method_frame.method.consumer_tag](method_frame.method,
+ header_frame.properties,
+ body)
+
+ def _async_basic_return(self, method_frame, header_frame, body):
+ raise "Unimplemented"
+
+ def _async_channel_flow(self, method_frame, header_frame, body):
+ raise "Unimplemented"
+
+ def close(self):
+ c = spec.Channel.Close(reply_code = 0,
+ reply_text = 'Normal close',
+ class_id = 0,
+ method_id = 0)
+ self.handler._rpc(c, [spec.Channel.CloseOk])
+ self.handler._set_channel_close(c)
+
+ def basic_publish(self, exchange, routing_key, body, properties = None, mandatory = False, immediate = False):
+ properties = properties or spec.BasicProperties()
+ self.handler.connection.send_method(self.handler.channel_number,
+ spec.Basic.Publish(exchange = exchange,
+ routing_key = routing_key,
+ mandatory = mandatory,
+ immediate = immediate),
+ (properties, body))
+
+ def basic_consume(self, consumer, queue = '', no_ack = False, exclusive = False):
+ tag = 'ctag' + str(self.next_consumer_tag)
+ self.next_consumer_tag = self.next_consumer_tag + 1
+ self.callbacks[tag] = consumer
+ return self.handler._rpc(spec.Basic.Consume(queue = queue,
+ consumer_tag = tag,
+ no_ack = no_ack,
+ exclusive = exclusive),
+ [spec.Basic.ConsumeOk]).consumer_tag
+
+ def basic_cancel(self, consumer_tag):
+ self.handler._rpc(spec.Basic.Cancel(consumer_tag = consumer_tag),
+ [spec.Basic.CancelOk])
+ del self.callbacks[consumer_tag]
164 rabbitmq/codec.py
@@ -0,0 +1,164 @@
+import struct
+import rabbitmq.spec as spec
+
+from rabbitmq.exceptions import *
+
+class Frame:
+ def __init__(self, frame_type, channel_number):
+ self.frame_type = frame_type
+ self.channel_number = channel_number
+
+ def _marshal(self, pieces):
+ payload = ''.join(pieces)
+ return struct.pack('>BHI', self.frame_type, self.channel_number, len(payload)) + \
+ payload + chr(spec.FRAME_END)
+
+ def __repr__(self):
+ import rabbitmq.specbase
+ return rabbitmq.specbase._codec_repr(self, lambda: Frame(-1, -1))
+
+class FrameMethod(Frame):
+ def __init__(self, channel_number, method):
+ Frame.__init__(self, spec.FRAME_METHOD, channel_number)
+ self.method = method
+
+ def marshal(self):
+ pieces = self.method.encode()
+ pieces.insert(0, struct.pack('>I', self.method.INDEX))
+ return self._marshal(pieces)
+
+class FrameHeader(Frame):
+ def __init__(self, channel_number, body_size, props):
+ Frame.__init__(self, spec.FRAME_HEADER, channel_number)
+ self.body_size = body_size
+ self.properties = props
+
+ def marshal(self):
+ pieces = self.properties.encode()
+ pieces.insert(0, struct.pack('>HxxQ', self.properties.INDEX, self.body_size))
+ return self._marshal(pieces)
+
+class FrameBody(Frame):
+ def __init__(self, channel_number, fragment):
+ Frame.__init__(self, spec.FRAME_BODY, channel_number)
+ self.fragment = fragment
+
+ def marshal(self):
+ return self._marshal([self.fragment])
+
+class FrameHeartbeat(Frame):
+ def __init__(self):
+ Frame.__init__(self, spec.FRAME_HEARTBEAT, 0)
+
+ def marshal(self):
+ return self._marshal([])
+
+class FrameProtocolHeader(Frame):
+ def __init__(self, th, tl, vh, vl):
+ Frame.__init__(self, -1, -1)
+ self.transport_high = th
+ self.transport_low = tl
+ self.protocol_version_major = vh
+ self.protocol_version_minor = vl
+
+ def marshal(self):
+ return 'AMQP' + struct.pack('BBBB',
+ self.transport_high,
+ self.transport_low,
+ self.protocol_version_major,
+ self.protocol_version_minor)
+
+class ConnectionState:
+ HEADER_SIZE = 7
+ FOOTER_SIZE = 1
+
+ def __init__(self):
+ self.channel_max = None
+ self.frame_max = None
+ self._return_to_idle()
+
+ def tune(self, channel_max, frame_max):
+ self.channel_max = channel_max
+ self.frame_max = frame_max
+
+ def _return_to_idle(self):
+ self.inbound_buffer = []
+ self.inbound_available = 0
+ self.target_size = ConnectionState.HEADER_SIZE
+ self.state = self._waiting_for_header
+
+ def _inbound(self):
+ return ''.join(self.inbound_buffer)
+
+ def handle_input(self, received_data):
+ total_bytes_consumed = 0
+
+ while True:
+ if not received_data:
+ return (total_bytes_consumed, None)
+
+ bytes_consumed = self.target_size - self.inbound_available
+ if len(received_data) < bytes_consumed:
+ bytes_consumed = len(received_data)
+
+ self.inbound_buffer.append(received_data[:bytes_consumed])
+ self.inbound_available = self.inbound_available + bytes_consumed
+ received_data = received_data[bytes_consumed:]
+ total_bytes_consumed = total_bytes_consumed + bytes_consumed
+
+ if self.inbound_available < self.target_size:
+ return (total_bytes_consumed, None)
+
+ maybe_result = self.state(self._inbound())
+ if maybe_result:
+ return (total_bytes_consumed, maybe_result)
+
+ def _waiting_for_header(self, inbound):
+ # Here we switch state without resetting the inbound_buffer,
+ # because we want to keep the frame header.
+
+ if inbound[:3] == 'AMQ':
+ # Protocol header.
+ self.target_size = 8
+ self.state = self._waiting_for_protocol_header
+ else:
+ self.target_size = struct.unpack_from('>I', inbound, 3)[0] + \
+ ConnectionState.HEADER_SIZE + \
+ ConnectionState.FOOTER_SIZE
+ self.state = self._waiting_for_body
+
+ def _waiting_for_body(self, inbound):
+ if ord(inbound[-1]) != spec.FRAME_END:
+ raise InvalidFrameError("Invalid frame end byte", inbound[-1])
+
+ self._return_to_idle()
+
+ (frame_type, channel_number) = struct.unpack_from('>BH', inbound, 0)
+ if frame_type == spec.FRAME_METHOD:
+ method_id = struct.unpack_from('>I', inbound, ConnectionState.HEADER_SIZE)[0]
+ method = spec.methods[method_id]()
+ method.decode(inbound, ConnectionState.HEADER_SIZE + 4)
+ return FrameMethod(channel_number, method)
+ elif frame_type == spec.FRAME_HEADER:
+ (class_id, body_size) = struct.unpack_from('>HxxQ', inbound,
+ ConnectionState.HEADER_SIZE)
+ props = spec.props[class_id]()
+ props.decode(inbound, ConnectionState.HEADER_SIZE + 12)
+ return FrameHeader(channel_number, body_size, props)
+ elif frame_type == spec.FRAME_BODY:
+ return FrameBody(channel_number,
+ inbound[ConnectionState.HEADER_SIZE : -ConnectionState.FOOTER_SIZE])
+ elif frame_type == spec.FRAME_HEARTBEAT:
+ return FrameHeartbeat()
+ else:
+ # Ignore the frame.
+ return None
+
+ def _waiting_for_protocol_header(self, inbound):
+ if inbound[3] != 'P':
+ raise InvalidProtocolHeader(inbound)
+
+ self._return_to_idle()
+
+ (th, tl, vh, vl) = struct.unpack_from('BBBB', inbound, 4)
+ return FrameProtocolHeader(th, tl, vh, vl)
238 rabbitmq/connection.py
@@ -0,0 +1,238 @@
+import asyncore
+import socket
+
+import rabbitmq.spec as spec
+import rabbitmq.codec as codec
+import rabbitmq.channel as channel
+from rabbitmq.exceptions import *
+
+class PlainCredentials:
+ def __init__(self, username, password):
+ self.username = username
+ self.password = password
+
+ def response_for(self, start):
+ if 'PLAIN' not in start.mechanisms.split():
+ return None
+ return ('PLAIN', '\0%s\0%s' % (self.username, self.password))
+
+class ConnectionParameters:
+ def __init__(self, channel_max = 0, frame_max = 131072, heartbeat = 0):
+ self.channel_max = channel_max
+ self.frame_max = frame_max
+ self.heartbeat = heartbeat
+
+class Connection(asyncore.dispatcher):
+ def __init__(self,
+ host,
+ port = None,
+ virtual_host = "/",
+ credentials = None,
+ parameters = None,
+ wait_for_open = True):
+ asyncore.dispatcher.__init__(self)
+
+ self.state = codec.ConnectionState()
+ self.credentials = credentials or PlainCredentials('guest', 'guest')
+ self.virtual_host = virtual_host
+ self.parameters = parameters or ConnectionParameters()
+ self.outbound_frames = []
+ self.frame_handler = self._login1
+ self.connection_open = False
+ self.connection_close = None
+ self.channels = {}
+ self.next_channel = 0
+
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.connect((host, port or spec.PORT))
+ self.send_frame(self._local_protocol_header())
+
+ if wait_for_open:
+ self.wait_for_open()
+
+ def _local_protocol_header(self):
+ return codec.FrameProtocolHeader(1,
+ 1,
+ spec.PROTOCOL_VERSION[0],
+ spec.PROTOCOL_VERSION[1])
+
+ def handle_connect(self):
+ pass
+
+ def _set_connection_close(self, c):
+ if not self.connection_close:
+ self.connection_close = c
+ for chan in self.channels.values():
+ chan._set_channel_close(c)
+
+ def close(self):
+ if self.connection_open:
+ self.connection_open = False
+ c = spec.Connection.Close(reply_code = 200,
+ reply_text = 'Normal shutdown',
+ class_id = 0,
+ method_id = 0)
+ self._rpc(0, c, [spec.Connection.CloseOk])
+ self._set_connection_close(c)
+ asyncore.dispatcher.close(self)
+
+ def handle_close(self):
+ self._set_connection_close(spec.Connection.Close(reply_code = 0,
+ reply_text = 'Socket closed',
+ class_id = 0,
+ method_id = 0))
+ self.close()
+
+ def handle_read(self):
+ b = self.state.channel_max
+ if not b: b = 131072
+
+ try:
+ buf = self.recv(b)
+ except socket.error:
+ self.handle_close()
+ raise
+
+ if not buf:
+ self.close()
+ return
+
+ while buf:
+ (consumed_count, frame) = self.state.handle_input(buf)
+ buf = buf[consumed_count:]
+ if frame:
+ self.frame_handler(frame)
+
+ def writable(self):
+ return (len(self.outbound_frames) > 0)
+
+ def handle_write(self):
+ frame = self.outbound_frames.pop(0)
+ #print 'Writing', frame
+ self.send(frame.marshal())
+
+ def _next_channel_number(self):
+ tries = 0
+ limit = self.state.channel_max or 32767
+ while self.next_channel in self.channels:
+ self.next_channel = (self.next_channel + 1) % limit
+ tries = tries + 1
+ if self.next_channel == 0:
+ self.next_channel = 1
+ if tries > limit:
+ raise NoFreeChannels()
+ return self.next_channel
+
+ def _set_channel(self, channel_number, channel):
+ self.channels[channel_number] = channel
+
+ def _ensure_channel(self, channel_number):
+ if self.connection_close:
+ raise ConnectionClosed(self.connection_close)
+ return self.channels[channel_number]._ensure()
+
+ def reset_channel(self, channel_number):
+ if channel_number in self.channels:
+ del self.channels[channel_number]
+
+ def send_frame(self, frame):
+ self.outbound_frames.append(frame)
+
+ def send_method(self, channel_number, method, content = None):
+ self.send_frame(codec.FrameMethod(channel_number, method))
+ props = None
+ body = None
+ if isinstance(content, tuple):
+ props = content[0]
+ body = content[1]
+ else:
+ body = content
+ if props:
+ length = 0
+ if body: length = len(body)
+ self.send_frame(codec.FrameHeader(channel_number, length, props))
+ if body:
+ maxpiece = (self.state.frame_max - \
+ codec.ConnectionState.HEADER_SIZE - \
+ codec.ConnectionState.FOOTER_SIZE)
+ while body:
+ piecelen = min(len(body), maxpiece)
+ piece = body[:piecelen]
+ body = body[piecelen:]
+ self.send_frame(codec.FrameBody(channel_number, piece))
+
+ def _rpc(self, channel_number, method, acceptable_replies):
+ channel = self._ensure_channel(channel_number)
+ self.send_method(channel_number, method)
+ return channel.wait_for_reply(acceptable_replies)
+
+ def _login1(self, frame):
+ if isinstance(frame, codec.FrameProtocolHeader):
+ raise ProtocolVersionMismatch(self._local_protocol_header,
+ frame)
+
+ response = self.credentials.response_for(frame.method)
+ if not response:
+ raise LoginError("No acceptable SASL mechanism for the given credentials",
+ credentials)
+ self.send_method(0, spec.Connection.StartOk(client_properties = \
+ {"product": "RabbitMQ Python"},
+ mechanism = response[0],
+ response = response[1]))
+ self._erase_credentials()
+ self.frame_handler = self._login2
+
+ def _erase_credentials(self):
+ self.credentials = None
+
+ def _login2(self, frame):
+ channel_max = combine_tuning(self.parameters.channel_max, frame.method.channel_max)
+ frame_max = combine_tuning(self.parameters.frame_max, frame.method.frame_max)
+ self.state.tune(channel_max, frame_max)
+ self.send_method(0, spec.Connection.TuneOk(
+ channel_max = channel_max,
+ frame_max = frame_max,
+ heartbeat = combine_tuning(self.parameters.heartbeat, frame.method.heartbeat)))
+ self.frame_handler = self._generic_frame_handler
+ self._install_channel0()
+ self.known_hosts = \
+ self._rpc(0, spec.Connection.Open(virtual_host = self.virtual_host,
+ insist = True),
+ [spec.Connection.OpenOk]).known_hosts
+ self.connection_open = True
+ self.handle_connection_open()
+
+ def _install_channel0(self):
+ c = channel.ChannelHandler(self, 0)
+ c.async_map[spec.Connection.Close] = self._async_connection_close
+
+ def channel(self):
+ return channel.Channel(channel.ChannelHandler(self))
+
+ def wait_for_open(self):
+ while not self.connection_open and not self.connection_close:
+ asyncore.loop(count = 1)
+
+ def handle_connection_open(self):
+ pass
+
+ def handle_connection_close(self):
+ pass
+
+ def _async_connection_close(self, method_frame, header_frame, body):
+ self._set_connection_close(method_frame.method)
+ self.connection_open = False
+ self.send_method(0, spec.Connection.CloseOk())
+ self.handle_connection_close()
+
+ def _generic_frame_handler(self, frame):
+ #print "GENERIC_FRAME_HANDLER", frame
+ if isinstance(frame, codec.FrameHeartbeat):
+ self.send_frame(frame) # echo the heartbeat
+ else:
+ self.channels[frame.channel_number].frame_handler(frame)
+
+def combine_tuning(a, b):
+ if a == 0: return b
+ if b == 0: return a
+ return min(a, b)
12 rabbitmq/exceptions.py
@@ -0,0 +1,12 @@
+class LoginError(Exception): pass
+class NoFreeChannels(Exception): pass
+
+class ConnectionClosed(Exception): pass
+class ChannelClosed(Exception): pass
+
+class ProtocolSyntaxError(Exception): pass
+class UnexpectedFrameError(ProtocolSyntaxError): pass
+class BodyTooLongError(ProtocolSyntaxError): pass
+class InvalidFrameError(ProtocolSyntaxError): pass
+class InvalidProtocolHeader(ProtocolSyntaxError): pass
+class InvalidTableError(ProtocolSyntaxError): pass
24 rabbitmq/specbase.py
@@ -0,0 +1,24 @@
+class Class:
+ def __repr__(self):
+ return _codec_repr(self, self.__class__)
+
+class Method:
+ def __repr__(self):
+ return _codec_repr(self, self.__class__)
+
+class Properties:
+ def __repr__(self):
+ return _codec_repr(self, self.__class__)
+
+def _codec_repr(o, c):
+ """Returns a repr()esentation of o in the form of a constructor
+ call, including only fields that differ from some default instance
+ constructed by invoking c with no arguments."""
+
+ d = c()
+ n = getattr(o, 'NAME', o.__class__.__name__)
+ return n + "(" + \
+ ", ".join(["%s = %s" % (k, repr(v)) \
+ for (k, v) in o.__dict__.iteritems() \
+ if getattr(d, k, None) != v]) + \
+ ")"
77 rabbitmq/table.py
@@ -0,0 +1,77 @@
+import struct
+import decimal
+import datetime
+import calendar
+
+from rabbitmq.exceptions import *
+
+def encode_table(pieces, table):
+ length_index = len(pieces)
+ pieces.append(None) # placeholder
+ tablesize = 0
+ for (key, value) in table.iteritems():
+ pieces.append(struct.pack('B', len(key)))
+ pieces.append(key)
+ tablesize = tablesize + 1 + len(key)
+ if isinstance(value, str):
+ pieces.append(struct.pack('>cI', 'S', len(value)))
+ pieces.append(value)
+ tablesize = tablesize + 5 + len(value)
+ elif isinstance(value, int):
+ pieces.append(struct.pack('>cI', 'I', value))
+ tablesize = tablesize + 5
+ elif isinstance(value, decimal.Decimal):
+ value = value.normalize()
+ if value._exp < 0:
+ decimals = -value._exp
+ raw = int(value * (decimal.Decimal(10) ** decimals))
+ pieces.append(struct.pack('cB>I', 'D', decimals, raw))
+ else:
+ # per spec, the "decimals" octet is unsigned (!)
+ pieces.append(struct.pack('cB>I', 'D', 0, int(value)))
+ tablesize = tablesize + 5
+ elif isinstance(value, datetime.datetime):
+ pieces.append(struct.pack('>cQ', 'T', calendar.timegm(value.utctimetuple())))
+ tablesize = tablesize + 9
+ elif isinstance(value, dict):
+ tablesize = tablesize + encode_table(pieces, value)
+ else:
+ raise InvalidTableError("Unsupported field kind during encoding", key, value)
+ pieces[length_index] = struct.pack('>I', tablesize)
+ return tablesize + 4
+
+def decode_table(encoded, offset):
+ result = {}
+ tablesize = struct.unpack_from('>I', encoded, offset)[0]
+ offset = offset + 4
+ limit = offset + tablesize
+ while offset < limit:
+ keylen = struct.unpack_from('B', encoded, offset)[0]
+ offset = offset + 1
+ key = encoded[offset : offset + keylen]
+ offset = offset + keylen
+ kind = encoded[offset]
+ offset = offset + 1
+ if kind == 'S':
+ length = struct.unpack_from('>I', encoded, offset)[0]
+ offset = offset + 4
+ value = encoded[offset : offset + length]
+ offset = offset + length
+ elif kind == 'I':
+ value = struct.unpack_from('>I', encoded, offset)[0]
+ offset = offset + 4
+ elif kind == 'D':
+ decimals = struct.unpack_from('B', encoded, offset)[0]
+ offset = offset + 1
+ raw = struct.unpack_from('>I', encoded, offset)[0]
+ offset = offset + 4
+ value = decimal.Decimal(raw) * (decimal.Decimal(10) ** -decimals)
+ elif kind == 'T':
+ value = datetime.datetime.utcfromtimestamp(struct.unpack_from('>Q', encoded, offset)[0])
+ offset = offset + 8
+ elif kind == 'F':
+ (value, offset) = decode_table(encoded, offset)
+ else:
+ raise InvalidTableError("Unsupported field kind %s during decoding" % (kind,))
+ result[key] = value
+ return (result, offset)
18 setup.py
@@ -0,0 +1,18 @@
+from distutils.core import setup
+import os
+
+try:
+ os.stat("rabbitmq/spec.py")
+except:
+ import sys
+ print >> sys.stderr, 'Autogenerated spec.py not found -- run make first!'
+ sys.exit(1)
+
+setup(name='rabbitmq',
+ version='0.1',
+ description='RabbitMQ Python AMQP Client Library',
+ author='Tony Garnock-Jones',
+ author_email='tonyg@rabbitmq.com',
+ url='http://www.rabbitmq.com/',
+ packages=['rabbitmq'],
+ )
Please sign in to comment.
Something went wrong with that request. Please try again.