Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Project start

  • Loading branch information...
commit bed8939a38c7f0686bca3c784f0f111d73a62448 0 parents
@majek authored
3  LICENSE
@@ -0,0 +1,3 @@
+Unless differently stated files in this package are licensed under the
+MIT License. For the MIT License please see LICENSE-MIT-Puka.
+
19 LICENSE-MIT-Puka
@@ -0,0 +1,19 @@
+Copyright (c) 2010 Marek Majkowski
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
125 README.md
@@ -0,0 +1,125 @@
+Puka - the opinionated RabbitMQ client
+======================================
+
+Puka is yet-another Python client library for RabbitMQ. But as opposed
+to similar libraries, it does not try to expose a generic AMQP
+API. Instead, it takes an opinionated view on how the user should
+interact with RabbitMQ.
+
+
+Puka is simple
+--------------
+
+Puka exposes a simple, easy to understand API. Take a look at the
+`publisher` example:
+
+ import puka
+
+ client = puka.Puka("amqp://localhost/")
+
+ ticket = client.connect()
+ client.wait(ticket)
+
+ ticket = client.queue_declare(queue='test')
+ client.wait(ticket)
+
+ ticket = client.basic_publish(exchange='', routing_key='test',
+ body='Hello world!')
+ client.wait(ticket)
+
+
+Puka is asynchronous
+--------------------
+
+Puka by all means is asynchronous. Although, as you can see in example
+above, it can behave synchronously. That's especially useful for
+simple tasks when you don't want to introduce complex callbacks.
+
+Here's the same code written in an asynchronous way:
+
+ import puka
+ import sys
+
+ def run():
+ client = puka.Puka("amqp://localhost/")
+
+ client.connect(callback=on_connection)
+
+ def on_connection(result):
+ client.queue_declare(queue='test', callback=on_queue_declare)
+
+ def on_queue_declare(result):
+ client.basic_publish(exchange='', routing_key='test',
+ body="Hello world!",
+ callback=on_basic_publish)
+
+ def on_basic_publish(result):
+ sys.exit(0)
+
+ client.wait_forever()
+
+ run()
+
+
+Puka never blocks
+-----------------
+
+In the pure asynchronous programming style Puka never blocks your
+program waiting for network. On the other hand it is your
+responsibility to notify when new data is available on the network
+socket. To allow that Puka allows you to access the raw socket
+descriptor. With that in hand you can construct your own event
+loop. Here's an the event loop that may replace `wait_forever` from
+previous example:
+
+ fd = client.fileno()
+ while True:
+ while client.handle_data():
+ pass
+
+ r, w, e = select.select([fd],[fd] if client.needs_write() else [], [fd])
+ if r or e:
+ client.on_read()
+ if w:
+ client.on_write()
+
+
+Puka is fast
+------------
+
+Puka is asynchronous and has no trouble in handling many requests at a
+time. This can be exploited to achieve a degree of parallelism. For
+example, this snippet creates 1000 queues in parallel:
+
+ tickets = [puka.queue_declare(queue='a%04i' % i) for i in range(1000)]
+ pork.wait_for_many(tickets)
+
+
+Puka is also created to be a flat library, with only a few necessary
+indirection layers underneath. That does effect in a pretty low CPU
+usage.
+
+
+Puka is sane
+------------
+
+Puka does expose only a sane subset of AMQP, as judged by the
+author. The major differences between Puka and raw AMQP are:
+
+ - Channels and transactions aren't exposed at all.
+ - Properties and headers are exposed as a one thing.
+ - Deleting queues 'on broker restart' is not allowed. Queues can be
+ only `auto_delete` or `persistent`.
+ - Mystic 'delivery-mode' property is renamed to `persistent` and by default
+ all outgoing messages are marked as `persistent`.
+ - Everything is made to be synchronous, even 'basic_publish' and 'basic_ack'.
+
+
+
+Puka is experimental
+--------------------
+
+Puka is a side project, written mostly to prove if it is possible to
+create a reasonable API on top of the AMQP protocol. It is not
+supported by anyone and might be abandoned at any time.
+
27 examples/receive.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+
+import os
+import sys
+sys.path.append(os.path.join("..", "puka"))
+
+
+import puka
+
+
+client = puka.Puka("amqp://localhost/")
+ticket = client.connect()
+client.wait(ticket)
+
+ticket = client.queue_declare(queue='test')
+client.wait(ticket)
+
+print " [*] Waiting for messages. Press CTRL+C to quit."
+
+consume_ticket = client.basic_consume(queue='test')
+while True:
+ result = client.wait(consume_ticket)
+ print " [x] Received message %r" % (result,)
+
+ client.basic_ack(result)
+
+
27 examples/receive_one.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+
+import os
+import sys
+sys.path.append(os.path.join("..", "puka"))
+
+
+import puka
+
+
+client = puka.Puka("amqp://localhost/")
+ticket = client.connect()
+client.wait(ticket)
+
+ticket = client.queue_declare(queue='test')
+client.wait(ticket)
+
+print " [*] Waiting for a message. Press CTRL+C to quit."
+
+consume_ticket = client.basic_consume(queue='test')
+result = client.wait(consume_ticket)
+print " [x] Received message %r" % (result,)
+
+client.basic_ack(result)
+
+client.cancel(consume_ticket)
+client.wait(consume_ticket)
23 examples/send.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+
+import os
+import sys
+sys.path.append(os.path.join("..", "puka"))
+
+
+import puka
+
+
+client = puka.Puka("amqp://localhost/")
+ticket = client.connect()
+client.wait(ticket)
+
+ticket = client.queue_declare(queue='test')
+client.wait(ticket)
+
+ticket = client.basic_publish(exchange='', routing_key='test',
+ body="Hello world!")
+client.wait(ticket)
+
+print " [*] Message sent"
+
0  puka/__init__.py
No changes.
128 puka/table.py
@@ -0,0 +1,128 @@
+# ***** BEGIN LICENSE BLOCK *****
+# Version: MPL 1.1/GPL 2.0
+#
+# The contents of this file are subject to the Mozilla Public License
+# Version 1.1 (the "License"); you may not use this file except in
+# compliance with the License. You may obtain a copy of the License at
+# http://www.mozilla.org/MPL/
+#
+# Software distributed under the License is distributed on an "AS IS"
+# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+# the License for the specific language governing rights and
+# limitations under the License.
+#
+# The Original Code is Pika.
+#
+# The Initial Developers of the Original Code are LShift Ltd, Cohesive
+# Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+# created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+# Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+# (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+# Rabbit Technologies Ltd.
+#
+# Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+# Ltd. Portions created by Cohesive Financial Technologies LLC are
+# Copyright (C) 2007-2009 Cohesive Financial Technologies
+# LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+# 2007-2009 Rabbit Technologies Ltd.
+#
+# Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+# LShift Ltd and Tony Garnock-Jones.
+#
+# All Rights Reserved.
+#
+# Contributor(s): ______________________________________.
+#
+# Alternatively, the contents of this file may be used under the terms
+# of the GNU General Public License Version 2 or later (the "GPL"), in
+# which case the provisions of the GPL are applicable instead of those
+# above. If you wish to allow use of your version of this file only
+# under the terms of the GPL, and not to allow others to use your
+# version of this file under the terms of the MPL, indicate your
+# decision by deleting the provisions above and replace them with the
+# notice and other provisions required by the GPL. If you do not
+# delete the provisions above, a recipient may use your version of
+# this file under the terms of any one of the MPL or the GPL.
+#
+# ***** END LICENSE BLOCK *****
+
+import struct
+import decimal
+import datetime
+import calendar
+
+from pika.exceptions import *
+
+def encode_table(pieces, table):
+ if table is None:
+ table = {}
+ length_index = len(pieces)
+ pieces.append(None) # placeholder
+ tablesize = 0
+ for (key, value) in table.iteritems():
+ pieces.append(struct.pack('B', len(key)))
+ pieces.append(key)
+ tablesize = tablesize + 1 + len(key)
+ if isinstance(value, str):
+ pieces.append(struct.pack('>cI', 'S', len(value)))
+ pieces.append(value)
+ tablesize = tablesize + 5 + len(value)
+ elif isinstance(value, int):
+ pieces.append(struct.pack('>cI', 'I', value))
+ tablesize = tablesize + 5
+ elif isinstance(value, decimal.Decimal):
+ value = value.normalize()
+ if value._exp < 0:
+ decimals = -value._exp
+ raw = int(value * (decimal.Decimal(10) ** decimals))
+ pieces.append(struct.pack('cB>I', 'D', decimals, raw))
+ else:
+ # per spec, the "decimals" octet is unsigned (!)
+ pieces.append(struct.pack('cB>I', 'D', 0, int(value)))
+ tablesize = tablesize + 5
+ elif isinstance(value, datetime.datetime):
+ pieces.append(struct.pack('>cQ', 'T', calendar.timegm(value.utctimetuple())))
+ tablesize = tablesize + 9
+ elif isinstance(value, dict):
+ pieces.append(struct.pack('>c', 'F'))
+ tablesize = tablesize + 1 + encode_table(pieces, value)
+ else:
+ raise InvalidTableError("Unsupported field kind during encoding", key, value)
+ pieces[length_index] = struct.pack('>I', tablesize)
+ return tablesize + 4
+
+def decode_table(encoded, offset):
+ result = {}
+ tablesize = struct.unpack_from('>I', encoded, offset)[0]
+ offset = offset + 4
+ limit = offset + tablesize
+ while offset < limit:
+ keylen = struct.unpack_from('B', encoded, offset)[0]
+ offset = offset + 1
+ key = encoded[offset : offset + keylen]
+ offset = offset + keylen
+ kind = encoded[offset]
+ offset = offset + 1
+ if kind == 'S':
+ length = struct.unpack_from('>I', encoded, offset)[0]
+ offset = offset + 4
+ value = encoded[offset : offset + length]
+ offset = offset + length
+ elif kind == 'I':
+ value = struct.unpack_from('>I', encoded, offset)[0]
+ offset = offset + 4
+ elif kind == 'D':
+ decimals = struct.unpack_from('B', encoded, offset)[0]
+ offset = offset + 1
+ raw = struct.unpack_from('>I', encoded, offset)[0]
+ offset = offset + 4
+ value = decimal.Decimal(raw) * (decimal.Decimal(10) ** -decimals)
+ elif kind == 'T':
+ value = datetime.datetime.utcfromtimestamp(struct.unpack_from('>Q', encoded, offset)[0])
+ offset = offset + 8
+ elif kind == 'F':
+ (value, offset) = decode_table(encoded, offset)
+ else:
+ raise InvalidTableError("Unsupported field kind %s during decoding" % (kind,))
+ result[key] = value
+ return (result, offset)
Please sign in to comment.
Something went wrong with that request. Please try again.