Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: pika/pika
...
head fork: majek/pika
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 9 files changed
  • 0 commit comments
  • 1 contributor
Commits on Nov 18, 2009
majek Experimental support for blocking basic_publish on channel.flow e00314b
Commits on Nov 25, 2009
majek Send benchmarks 2fb9b0c
View
1  TODO
@@ -1,5 +1,6 @@
Here goes the list of bugs, limitations, ideas and feature requests.
+ o delayed_call is not implemented on blocking_adapter.py.
o We should at least pretend that we test the code. Test coverage
below 80% is a shame.
View
62 examples/demo_channel_flow_asyncore.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+'''
+Example show how Pika behaves when channel.flow occurs.
+Two channels are created: producer and consumer.
+
+Producer floods server with huge messages to cause out-of-memory issues.
+Client slowly receives messages.
+
+This example shows that on channel.flow:
+ - client still receives messages.
+ - producer blocks on basic_publish.
+
+Hold on, playing with channel.flow without setting qos is just stupid.
+All the messages will be bufferred on producer.
+'''
+
+import sys
+import pika
+from pika.asyncore_adapter import add_oneshot_timer_rel
+import time
+
+host = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
+credentials = pika.PlainCredentials('guest', 'guest')
+
+body = "a" * 1024*1024*4
+def send(conn, ch):
+ if not conn.writable():
+ for i in range(10):
+ t0 = time.time()
+ ch.basic_publish(exchange='',
+ routing_key="test",
+ body=body)
+ td = time.time() - t0
+ print "basic_publish took %.3fs" % (td,)
+ add_oneshot_timer_rel(0.1, lambda:send(conn, ch))
+
+
+
+conn = pika.AsyncoreConnection(pika.ConnectionParameters(host,
+ credentials=credentials))
+ch_prod = conn.channel()
+ch_prod.queue_declare(queue="test", durable=False, exclusive=False, auto_delete=True)
+add_oneshot_timer_rel(1.0, lambda :send(conn, ch_prod))
+
+
+
+def handle_delivery(ch, method, header, body):
+ def ack():
+ ch.basic_ack(delivery_tag = method.delivery_tag)
+ print "basic_ack"
+ add_oneshot_timer_rel(0.1, ack)
+
+
+ch_cons = conn.channel()
+ch_cons.basic_qos(prefetch_count=1)
+tag = ch_cons.basic_consume(handle_delivery, queue='test')
+
+
+
+
+while True:
+ pika.asyncore_loop()
View
62 examples/demo_channel_flow_poll.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+'''
+Example show how Pika behaves when channel.flow occurs.
+Two channels are created: producer and consumer.
+
+Producer floods server with huge messages to cause out-of-memory issues.
+Client slowly receives messages.
+
+This example shows that on channel.flow:
+ - client still receives messages.
+ - producer blocks on basic_publish.
+
+Hold on, playing with channel.flow without setting qos is just stupid.
+All the messages will be bufferred on producer.
+'''
+
+import sys
+import pika
+import time
+
+host = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
+credentials = pika.PlainCredentials('guest', 'guest')
+
+body = "a" * 1024*1024*4
+def send(conn, ch):
+ if not conn.writable():
+ for i in range(10):
+ t0 = time.time()
+ ch.basic_publish(exchange='',
+ routing_key="test",
+ body=body)
+ td = time.time() - t0
+ print "basic_publish took %.3fs" % (td,)
+ reactor.add_oneshot_timer_rel(0.1, lambda:send(conn, ch))
+
+
+reactor = pika.PollReactor()
+
+conn = reactor.Connection(pika.ConnectionParameters(host,
+ credentials=credentials))
+ch_prod = conn.channel()
+ch_prod.queue_declare(queue="test", durable=False, exclusive=False, auto_delete=True)
+reactor.add_oneshot_timer_rel(1.0, lambda :send(conn, ch_prod))
+
+
+
+def handle_delivery(ch, method, header, body):
+ def ack():
+ ch.basic_ack(delivery_tag = method.delivery_tag)
+ print "basic_ack"
+ reactor.add_oneshot_timer_rel(0.1, ack)
+
+
+ch_cons = conn.channel()
+ch_cons.basic_qos(prefetch_count=1)
+tag = ch_cons.basic_consume(handle_delivery, queue='test')
+
+
+
+
+while True:
+ reactor.loop()
View
41 examples/demo_send_asyncore.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+import sys
+import os
+import pika
+from pika.asyncore_adapter import add_oneshot_timer_rel
+import time
+
+host = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
+msgsize = (sys.argv[2]) if len(sys.argv) > 2 else 1024*1024*8
+credentials = pika.PlainCredentials('guest', 'guest')
+
+body = "a" * msgsize
+t0 = None
+ct = 20
+def send(conn, ch):
+ global t0, ct
+ if not conn.writable():
+ if t0 != None:
+ if not ct:
+ os.abort()
+ td = time.time() - t0
+ print "basic_publish took %.3fs" % (td,)
+ t0 = time.time()
+ ch.basic_publish(exchange='',
+ routing_key="test",
+ body=body)
+ ct -= 1
+ add_oneshot_timer_rel(0.1, lambda:send(conn, ch))
+
+
+
+
+conn = pika.AsyncoreConnection(pika.ConnectionParameters(host,
+ credentials=credentials))
+ch_prod = conn.channel()
+ch_prod.queue_declare(queue="test", durable=False, exclusive=False, auto_delete=True)
+add_oneshot_timer_rel(1.0, lambda :send(conn, ch_prod))
+
+
+while True:
+ pika.asyncore_loop()
View
41 examples/demo_send_poll.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+import sys
+import os
+import pika
+import time
+
+
+host = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
+msgsize = (sys.argv[2]) if len(sys.argv) > 2 else 1024*1024*8
+credentials = pika.PlainCredentials('guest', 'guest')
+
+body = "a" * msgsize
+t0 = None
+ct = 20
+def send(conn, ch):
+ global t0, ct
+ if not conn.writable():
+ if t0 != None:
+ if not ct:
+ os.abort()
+ td = time.time() - t0
+ print "basic_publish took %.3fs" % (td,)
+ t0 = time.time()
+ ch.basic_publish(exchange='',
+ routing_key="test",
+ body=body)
+ ct -= 1
+ reactor.add_oneshot_timer_rel(0.1, lambda:send(conn, ch))
+
+
+reactor = pika.PollReactor()
+
+conn = reactor.Connection(pika.ConnectionParameters(host,
+ credentials=credentials))
+ch_prod = conn.channel()
+ch_prod.queue_declare(queue="test", durable=False, exclusive=False, auto_delete=True)
+reactor.add_oneshot_timer_rel(1.0, lambda :send(conn, ch_prod))
+
+
+while True:
+ reactor.loop()
View
3  pika/__init__.py
@@ -12,3 +12,6 @@
from pika.blocking_adapter import \
BlockingConnection
+
+from pika.poll_adapter import \
+ PollReactor
View
42 pika/channel.py
@@ -2,6 +2,7 @@
import pika.codec as codec
import pika.event as event
from pika.exceptions import *
+import sys
class ChannelHandler:
def __init__(self, connection, channel_number = None):
@@ -10,6 +11,7 @@ def __init__(self, connection, channel_number = None):
self.frame_handler = self._handle_method
self.channel_close = None
self.async_map = {}
+ self.blocking = False
self.channel_state_change_event = event.Event()
@@ -38,11 +40,23 @@ def addStateChangeHandler(self, handler, key = None):
self.channel_state_change_event.addHandler(handler, key)
handler(self, not self.channel_close)
+ def flush_inbound(self):
+ while self.inbound:
+ method_frame, header_frame, body = self.inbound.pop(0)
+ method = method_frame.method
+ if method.__class__ in self.async_map:
+ self.async_map[method.__class__](method_frame, header_frame, body)
+ else:
+ sys.stderr.write("unknown method in this context! %r %r %r\n" \
+ % (method_frame, header_frame, body))
+
def wait_for_reply(self, acceptable_replies):
if not acceptable_replies:
# One-way.
return
index = 0
+
+ self.blocking = True # don't deliver anything.
while True:
self._ensure()
while index >= len(self.inbound):
@@ -52,16 +66,22 @@ def wait_for_reply(self, acceptable_replies):
if isinstance(frame, codec.FrameMethod):
reply = frame.method
if reply.__class__ in acceptable_replies:
- (hframe, body) = self.inbound[index][1:3]
- if hframe is not None:
- reply._set_content(hframe.properties, body)
- self.inbound[index:index+1] = []
- return reply
+ (mframe, hframe, body) = self.inbound[index]
+ self.inbound[index:index+1] = [] # pop that item
+ self.blocking = False
+ self.connection.delayed_call(0.0, self.flush_inbound)
+ if mframe.method.__class__ in self.async_map:
+ self.async_map[mframe.method.__class__](mframe, hframe, body)
+ return
+ else:
+ if hframe is not None:
+ reply._set_content(hframe.properties, body)
+ return reply
index = index + 1
def _handle_async(self, method_frame, header_frame, body):
method = method_frame.method
- if method.__class__ in self.async_map:
+ if method.__class__ in self.async_map and not self.blocking:
self.async_map[method.__class__](method_frame, header_frame, body)
else:
self.inbound.append((method_frame, header_frame, body))
@@ -115,6 +135,7 @@ def __init__(self, handler):
self.handler = handler
self.callbacks = {}
self.next_consumer_tag = 0
+ self.flow_lock = False
handler.async_map[spec.Channel.Close] = handler._async_channel_close
@@ -137,7 +158,11 @@ def _async_basic_return(self, method_frame, header_frame, body):
raise "Unimplemented"
def _async_channel_flow(self, method_frame, header_frame, body):
- raise "Unimplemented"
+ if method_frame.method.active is False:
+ self.flow_lock = True
+ else:
+ self.flow_lock = False
+ sys.stderr.write("channel.flow lock = %r\n" % (self.flow_lock,))
def close(self, code = 0, text = 'Normal close'):
c = spec.Channel.Close(reply_code = code,
@@ -148,6 +173,9 @@ def close(self, code = 0, text = 'Normal close'):
self.handler._set_channel_close(c)
def basic_publish(self, exchange, routing_key, body, properties = None, mandatory = False, immediate = False):
+ while self.flow_lock is True:
+ self.handler.wait_for_reply([spec.Channel.Flow])
+
properties = properties or spec.BasicProperties()
self.handler.connection.send_method(self.handler.channel_number,
spec.Basic.Publish(exchange = exchange,
View
6 pika/connection.py
@@ -247,7 +247,11 @@ def send_frame(self, frame):
marshalled_frame = frame.marshal()
self.bytes_sent = self.bytes_sent + len(marshalled_frame)
self.outbound_buffer.write(marshalled_frame)
- #print 'Wrote %r' % (frame, )
+ if getattr(self, '_do_send', None):
+ self._do_send()
+
+ def writable(self):
+ return bool(self.outbound_buffer)
def send_method(self, channel_number, method, content = None):
self.send_frame(codec.FrameMethod(channel_number, method))
View
150 pika/poll_adapter.py
@@ -0,0 +1,150 @@
+'''
+Asyncore_adapter is not good enough!
+'''
+import sys
+import traceback
+import socket
+import select
+import time
+import heapq
+from errno import EAGAIN
+import pika.connection
+
+
+class PollReactor():
+ def __init__(self):
+ self.timer_heap = []
+ self.poll = select.poll()
+ self.fds = {}
+
+ def register(self, fd, conn):
+ self.poll.register(fd, select.POLLIN)
+ self.fds[fd] = conn
+
+ def unregister(self, fd):
+ self.poll.unregister(fd)
+ del self.fds[fd]
+
+ def register_write(self, fd):
+ self.poll.unregister(fd)
+ self.poll.register(fd, select.POLLIN | select.POLLOUT)
+
+ def unregister_write(self, fd):
+ self.poll.unregister(fd)
+ self.poll.register(fd, select.POLLIN)
+
+ def Connection(self, *args, **kwargs):
+ return _PollConnection(self, *args, **kwargs)
+
+ def add_oneshot_timer_abs(self, firing_time, callback):
+ heapq.heappush(self.timer_heap, (firing_time, callback))
+
+ def add_oneshot_timer_rel(self, firing_delay, callback):
+ self.add_oneshot_timer_abs(time.time() + firing_delay, callback)
+
+ def next_event_timeout(self):
+ cutoff = self.run_timers_internal()
+ if self.timer_heap:
+ timeout = self.timer_heap[0][0] - cutoff
+ else:
+ timeout = 30.0 # default timeout
+ return timeout
+
+ def log_timer_error(self, info):
+ sys.stderr.write('EXCEPTION IN ASYNCORE_ADAPTER TIMER\n')
+ traceback.print_exception(*info)
+
+ def run_timers_internal(self):
+ cutoff = time.time()
+ while self.timer_heap and self.timer_heap[0][0] < cutoff:
+ try:
+ heapq.heappop(self.timer_heap)[1]()
+ except:
+ self.log_timer_error(sys.exc_info())
+ cutoff = time.time()
+ return cutoff
+
+ def loop1(self):
+ if self.fds:
+ fds_events = self.poll.poll(self.next_event_timeout()*1000.0) # ms
+ for fd, event in fds_events:
+ conn = self.fds[fd]
+ if event == select.POLLIN:
+ conn._do_recv()
+ elif event == select.POLLOUT:
+ conn._do_send()
+ else:
+ conn.disconnect_transport()
+ else:
+ time.sleep(self.next_event_timeout())
+
+
+ def loop(self, count = None):
+ if count is None:
+ while self.timer_heap:
+ self.loop1()
+ else:
+ while (self.fds or self.timer_heap) and count > 0:
+ self.loop1()
+ count = count - 1
+ self.run_timers_internal()
+
+
+
+class _PollConnection(pika.connection.Connection):
+ def __init__(self, reactor, *args, **kwargs):
+ self.reactor = reactor
+ self.registered_write = False
+ pika.connection.Connection.__init__(self, *args, **kwargs)
+
+ def delayed_call(self, delay_sec, callback):
+ self.reactor.add_oneshot_timer_rel(delay_sec, callback)
+
+
+ def connect(self, host, port):
+ self.socket = socket.socket()
+ self.socket.connect((host, port)) # yep, this is blocking.
+ self.reactor.register(self.socket.fileno(), self)
+ self.socket.setblocking(False)
+ self.on_connected()
+
+ def disconnect_transport(self):
+ self.reactor.unregister(self.socket.fileno())
+ self.socket.close()
+ self.on_disconnected()
+
+ def drain_events(self):
+ self.reactor.loop(count = 1)
+
+ def _do_recv(self):
+ while True:
+ try:
+ buf = self.socket.recv(self.suggested_buffer_size())
+ except socket.error, (errno, _):
+ if errno == EAGAIN:
+ return
+ else:
+ self.disconnect_transport()
+ return
+ self.on_data_available(buf)
+
+ def _do_send(self):
+ try:
+ while self.outbound_buffer:
+ fragment = self.outbound_buffer.read()
+ r = self.socket.send(fragment)
+ self.outbound_buffer.consume(r)
+ except socket.error, (errno, _):
+ if errno == EAGAIN:
+ if not self.registered_write:
+ self.registered_write = True
+ self.reactor.register_write(self.socket.fileno())
+ return
+ else:
+ self.disconnect_transport()
+ return
+ if self.registered_write:
+ self.registered_write = False
+ self.reactor.unregister_write(self.socket.fileno())
+
+

No commit comments for this range

Something went wrong with that request. Please try again.