In [1]:
import pika, functools, threading, time

# Result storage

In [2]:
results = []
flow = []
connection_error = []

# Declare queue and send flow immediately

In [3]:
def on_open(connection):
    connection.channel(on_channel_open)

In [4]:
def on_close(connection, reason_code, reason_text):
    connection_error.append((reason_code, reason_text))

In [5]:
def on_channel_open(channel):
    channel.queue_declare(
        functools.partial(on_declare_ok, channel=channel),
        queue='foo',
        exclusive=True,
    )

In [6]:
def on_declare_ok(method_frame, channel):
    channel.basic_consume(on_consume, queue='foo')
    channel.flow(on_flow_ok, active=False)

In [7]:
def on_consume(channel, method_frame, properties, body):
    channel.basic_ack(method_frame.delivery_tag)
    results.append(body)

In [8]:
def on_flow_ok(active):
    flow.append(active)

In [9]:
connection = pika.SelectConnection(
    pika.URLParameters('amqp://guest:guest@rabbitmq:5672/%2F'),
    on_open_callback=on_open,
    on_close_callback=on_close,
)

In [10]:
thread = threading.Thread(target=connection.ioloop.start)
thread.daemon = True
thread.start()

# Publish to this queue

In [11]:
time.sleep(1)

In [12]:
def on_open2(connection):
    connection.channel(on_channel_open2)

In [13]:
def on_channel_open2(channel):
    for num in range(10):
        channel.basic_publish(
            exchange='', 
            routing_key='foo',
            body='Test Message %d' % num,
        )

In [14]:
connection2 = pika.SelectConnection(
    pika.URLParameters('amqp://guest:guest@rabbitmq:5672/%2F'),
    on_open_callback=on_open2,
)

In [15]:
thread2 = threading.Thread(target=connection2.ioloop.start)
thread2.daemon = True
thread2.start()

# Check results.

In [16]:
time.sleep(2)

In [17]:
results

[]

In [18]:
flow

[]

In [19]:
connection_error

[(540, 'NOT_IMPLEMENTED - active=false')]