Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 113 lines (76 sloc) 3.296 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
from __future__ import with_statement

import os
import puka
import random
import unittest_backport as unittest


AMQP_URL=os.getenv('AMQP_URL', 'amqp:///')

class TestQueue(unittest.TestCase):
    def test_queue_declare(self):
        qname = 'test%s-this-queue-should-be-autodeleted' % (random.random(),)

        client = puka.Client(AMQP_URL)
        promise = client.connect()
        client.wait(promise)

        promise = client.queue_declare(queue=qname, auto_delete=True)
        client.wait(promise)
        # The queue intentionally left hanging. Should be autoremoved.
        # Yes, no assertion here, we don't want to wait for 5 seconds.

    def test_queue_redeclare(self):
        qname = 'test%s' % (random.random(),)

        client = puka.Client(AMQP_URL)
        promise = client.connect()
        client.wait(promise)

        promise = client.queue_declare(queue=qname, auto_delete=False)
        r = client.wait(promise)

        promise = client.queue_declare(queue=qname, auto_delete=False)
        r = client.wait(promise)

        promise = client.queue_declare(queue=qname, auto_delete=True)
        with self.assertRaises(puka.PreconditionFailed):
            client.wait(promise)

        promise = client.queue_delete(queue=qname)
        client.wait(promise)


    def test_queue_redeclare_args(self):
        qname = 'test%s' % (random.random(),)

        client = puka.Client(AMQP_URL)
        promise = client.connect()
        client.wait(promise)

        promise = client.queue_declare(queue=qname, arguments={})
        r = client.wait(promise)

        promise = client.queue_declare(queue=qname, arguments={'x-expires':101})
        with self.assertRaises(puka.PreconditionFailed):
            client.wait(promise)

        promise = client.queue_delete(queue=qname)
        client.wait(promise)


    def test_queue_delete_not_found(self):
        client = puka.Client(AMQP_URL)
        promise = client.connect()
        client.wait(promise)

        promise = client.queue_delete(queue='not_existing_queue')

        with self.assertRaises(puka.NotFound):
            client.wait(promise)


    def test_queue_bind(self):
        qname = 'test%s' % (random.random(),)

        client = puka.Client(AMQP_URL)
        promise = client.connect()
        client.wait(promise)

        t = client.queue_declare(queue=qname)
        client.wait(t)

        t = client.exchange_declare(exchange=qname, type='direct')
        client.wait(t)

        t = client.basic_publish(exchange=qname, routing_key=qname, body='a')
        client.wait(t)

        t = client.queue_bind(exchange=qname, queue=qname, routing_key=qname)
        client.wait(t)

        t = client.basic_publish(exchange=qname, routing_key=qname, body='b')
        client.wait(t)

        t = client.queue_unbind(exchange=qname, queue=qname, routing_key=qname)
        client.wait(t)

        t = client.basic_publish(exchange=qname, routing_key=qname, body='c')
        client.wait(t)

        t = client.basic_get(queue=qname)
        r = client.wait(t)
        self.assertEquals(r['body'], 'b')
        self.assertEquals(r['message_count'], 0)

        t = client.queue_delete(queue=qname)
        client.wait(t)

        t = client.exchange_delete(exchange=qname)
        client.wait(t)

Something went wrong with that request. Please try again.