Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel FSM, accept as a context manager #80

Merged
merged 2 commits into from Apr 24, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
170 changes: 140 additions & 30 deletions pyon/net/channel.py
Expand Up @@ -42,8 +42,9 @@
from pika import BasicProperties
from gevent import queue as gqueue
from contextlib import contextmanager
from gevent.event import AsyncResult
from gevent.event import AsyncResult, Event
from pyon.net.transport import AMQPTransport, NameTrio
from pyon.util.fsm import FSM

class ChannelError(StandardError):
"""
Expand Down Expand Up @@ -72,6 +73,13 @@ class BaseChannel(object):
_exchange_auto_delete = True
_exchange_durable = False

# States, Inputs for FSM
S_INIT = 'INIT'
S_ACTIVE = 'ACTIVE'
S_CLOSED = 'CLOSED'
I_ATTACH = 'ATTACH'
I_CLOSE = 'CLOSE'

def __init__(self, transport=None, close_callback=None):
"""
Initializes a BaseChannel instance.
Expand All @@ -86,6 +94,13 @@ def __init__(self, transport=None, close_callback=None):
self.set_close_callback(close_callback)
self._transport = transport or AMQPTransport.get_instance()

# setup FSM for BaseChannel / SendChannel tree
self._fsm = FSM(self.S_INIT)
self._fsm.add_transition(self.I_ATTACH, self.S_INIT, None, self.S_ACTIVE)
self._fsm.add_transition(self.I_CLOSE, self.S_ACTIVE, self._on_close, self.S_CLOSED)
self._fsm.add_transition(self.I_CLOSE, self.S_CLOSED, None, self.S_CLOSED) # closed is a goal state, multiple closes are ok and are no-ops
self._fsm.add_transition(self.I_CLOSE, self.S_INIT, None, self.S_CLOSED) # INIT to CLOSED is fine too

def set_close_callback(self, close_callback):
"""
Sets a callback method to be called when this channel's close method is invoked.
Expand Down Expand Up @@ -133,6 +148,7 @@ def attach_underlying_channel(self, amq_chan):
Attaches an AMQP channel and indicates this channel is now open.
"""
self._amq_chan = amq_chan
self._fsm.process(self.I_ATTACH)

def get_channel_id(self):
"""
Expand All @@ -147,7 +163,13 @@ def get_channel_id(self):

def close(self):
"""
Default close method.
Default public close method.
"""
self._fsm.process(self.I_CLOSE)

def _on_close(self, fsm):
"""
FSM action method for going to the CLOSED state.

If a close callback was specified when creating this instance, it will call that,
otherwise it calls close_impl.
Expand Down Expand Up @@ -288,7 +310,7 @@ def send(self, data, headers=None):
self._send(self._send_name, data, headers=headers)

def _send(self, name, data, headers=None):
log.debug("SendChannel._send\n\tname: %s\n\tdata: %s\n\theaders: %s", name, data, headers)
log.debug("SendChannel._send\n\tname: %s\n\tdata: %s\n\theaders: %s", name, "-", headers)
exchange = name.exchange
routing_key = name.binding # uses "_queue" if binding not explictly defined
headers = headers or {}
Expand All @@ -309,7 +331,6 @@ class RecvChannel(BaseChannel):
"""
# data for this receive channel
_recv_queue = None
_consuming = False
_consumer_tag = None
_recv_name = None # name this receiving channel is receiving on - tuple (exchange, queue)
_recv_binding = None # binding this queue is listening on (set via _bind)
Expand All @@ -323,6 +344,11 @@ class RecvChannel(BaseChannel):
_consumer_exclusive = False
_consumer_no_ack = False # endpoint layers do the acking as they call recv()

# RecvChannel specific FSM states, inputs
S_CONSUMING = 'CONSUMING'
I_START_CONSUME = 'START_CONSUME'
I_STOP_CONSUME = 'STOP_CONSUME'

def __init__(self, name=None, binding=None, **kwargs):
"""
Initializer for a recv channel.
Expand All @@ -341,9 +367,10 @@ def __init__(self, name=None, binding=None, **kwargs):

BaseChannel.__init__(self, **kwargs)

def on_channel_close(self, code, text):
BaseChannel.on_channel_close(self, code, text)
self._consuming = False
# setup RecvChannel specific state transitions
self._fsm.add_transition(self.I_START_CONSUME, self.S_ACTIVE, self._on_start_consume, self.S_CONSUMING)
self._fsm.add_transition(self.I_STOP_CONSUME, self.S_CONSUMING, self._on_stop_consume, self.S_ACTIVE)
self._fsm.add_transition(self.I_CLOSE, self.S_CONSUMING, self._on_close_while_consume, self.S_CLOSED)

def setup_listener(self, name=None, binding=None):
"""
Expand Down Expand Up @@ -429,9 +456,15 @@ def start_consume(self):

setup_listener must have been called first.
"""
log.debug("RecvChannel.start_consume")
if self._consuming:
raise ChannelError("Already consuming")
self._fsm.process(self.I_START_CONSUME)

def _on_start_consume(self, fsm):
"""
Starts consuming messages.

setup_listener must have been called first.
"""
log.debug("RecvChannel._on_start_consume")

if self._consumer_tag and self._queue_auto_delete:
log.warn("Attempting to start consuming on a queue that may have been auto-deleted")
Expand All @@ -442,38 +475,63 @@ def start_consume(self):
queue=self._recv_name.queue,
no_ack=self._consumer_no_ack,
exclusive=self._consumer_exclusive)
self._consuming = True

def stop_consume(self):
"""
Stops consuming messages.

If the queue has auto_delete, this will delete it.
"""
log.debug("RecvChannel.stop_consume")
if not self._consuming:
raise ChannelError("Not consuming")
self._fsm.process(self.I_STOP_CONSUME)

def _on_stop_consume(self, fsm):
"""
Stops consuming messages.

If the queue has auto_delete, this will delete it.
"""
log.debug("RecvChannel._on_stop_consume")

if self._queue_auto_delete:
log.debug("Autodelete is on, this will destroy this queue: %s", self._recv_name.queue)

self._ensure_amq_chan()

self._sync_call(self._amq_chan.basic_cancel, 'callback', self._consumer_tag)
self._consuming = False

def _on_close_while_consume(self, fsm):
"""
Handles the case where close is issued on a consuming channel.
"""
self._fsm.process_list([self.I_STOP_CONSUME, self.I_CLOSE])

def recv(self):
"""
Pulls a message off the queue, will block if there are none.
Typically done by the EndpointUnit layer. Should ack the message there as it is "taking ownership".
"""
msg = self._recv_queue.get()
while True:
msg = self._recv_queue.get()

# spin on non-closed messages until we get to what we are waiting for
if self._should_discard and not isinstance(msg, ChannelShutdownMessage):
log.debug("RecvChannel.recv: discarding non-shutdown message while in closing/closed state")
continue

# how we handle closed/closing calls, not the best @TODO
if isinstance(msg, ChannelShutdownMessage):
raise ChannelClosedError('Attempt to recv on a channel that is being closed.')

return msg

# how we handle closed/closing calls, not the best @TODO
if isinstance(msg, ChannelShutdownMessage):
raise ChannelClosedError('Attempt to recv on a channel that is being closed.')
@property
def _should_discard(self):
"""
If the recv loop should discard any messages, aka while closed or closing.

return msg
This method's existence is an implementation detail, to be overridden by derived classes
that may define different closed or closing states.
"""
return self._fsm.current_state == self.S_CLOSED

def close_impl(self):
"""
Expand All @@ -482,11 +540,7 @@ def close_impl(self):
If we've declared and we're not auto_delete, must delete here.
Also put a ChannelShutdownMessage in the recv queue so anything blocking on reading it will get notified via ChannelClosedError.
"""
# stop consuming if we are consuming
log.debug("RecvChannel.close_impl (%s): consuming %s", self.get_channel_id(), self._consuming)

if self._consuming:
self.stop_consume()
log.debug("RecvChannel.close_impl (%s)", self.get_channel_id())

self._recv_queue.put(ChannelShutdownMessage())

Expand Down Expand Up @@ -527,14 +581,15 @@ def _bind(self, binding):
self._recv_binding = binding

def _on_deliver(self, chan, method_frame, header_frame, body):
log.debug("RecvChannel._on_deliver")

consumer_tag = method_frame.consumer_tag # use to further route?
delivery_tag = method_frame.delivery_tag # use to ack
redelivered = method_frame.redelivered
exchange = method_frame.exchange
routing_key = method_frame.routing_key

log.debug("RecvChannel._on_deliver, tag: %s, cur recv_queue len %d", delivery_tag, self._recv_queue.qsize())

# put body, headers, delivery tag (for acking) in the recv queue
self._recv_queue.put((body, header_frame.headers, delivery_tag))

Expand Down Expand Up @@ -599,6 +654,13 @@ class ListenChannel(RecvChannel):
to interact and returns it
"""

# States, Inputs for ListenChannel FSM
S_STOPPING = 'STOPPING'
S_CLOSING = 'CLOSING'
S_ACCEPTED = 'ACCEPTED'
I_ENTER_ACCEPT = 'ENTER_ACCEPT'
I_EXIT_ACCEPT = 'EXIT_ACCEPT'

class AcceptedListenChannel(RecvChannel):
"""
The type of channel returned by accept.
Expand All @@ -609,14 +671,59 @@ def close_impl(self):
"""
pass

def __init__(self, name=None, binding=None, **kwargs):
RecvChannel.__init__(self, name=name, binding=binding, **kwargs)

# setup ListenChannel specific state transitions
self._fsm.add_transition(self.I_ENTER_ACCEPT, self.S_CONSUMING, None, self.S_ACCEPTED)
self._fsm.add_transition(self.I_EXIT_ACCEPT, self.S_ACCEPTED, None, self.S_CONSUMING)
self._fsm.add_transition(self.I_CLOSE, self.S_ACCEPTED, None, self.S_CLOSING)
self._fsm.add_transition(self.I_EXIT_ACCEPT, self.S_CLOSING, self._on_close_while_accepted, self.S_CLOSED)
self._fsm.add_transition(self.I_STOP_CONSUME, self.S_ACCEPTED, None, self.S_STOPPING)
self._fsm.add_transition(self.I_EXIT_ACCEPT, self.S_STOPPING, self._on_stop_consume_while_accepted, self.S_ACTIVE)

def _create_accepted_channel(self, amq_chan, msg):
"""
Creates an AcceptedListenChannel.

Can be overridden by derived classes to get custom class types/functionality.
"""
ch = self.AcceptedListenChannel()
ch.attach_underlying_channel(amq_chan)
return ch

@property
def _should_discard(self):
"""
If the recv loop should discard any messages, aka while closed or closing.

This method's existence is an implementation detail, to be overridden by derived classes
that may define different closed or closing states.
"""
return self._fsm.current_state == self.S_CLOSED or self._fsm.current_state == self.S_CLOSING

def _on_close_while_accepted(self, fsm):
"""
Handles the delayed closing of a channel after the accept context has been exited.
"""
self._on_stop_consume(fsm)
self._on_close(fsm)

def _on_stop_consume_while_accepted(self, fsm):
"""
Handles the delayed consumer stop of a channel after the accept context has been exited.
"""
self._on_stop_consume(fsm)

@contextmanager
def accept(self):
"""
@returns A new channel that can:
Context manager method to accept new connections for listening endpoints.

Defers calls to close during handling of the message, to be called after
the context manager returns.

Yields A new channel that can:
- takes a copy of the underlying transport channel
- send() aka reply
- close without closing underlying transport channel
Expand All @@ -625,12 +732,15 @@ def accept(self):
- has the initial received message here put onto its recv gqueue
- recv() returns messages in its gqueue, endpoint should ack
"""
# self._ensure_amq_chan()
# self._ensure_amq_chan()
m = self.recv()
ch = self._create_accepted_channel(self._amq_chan, m)
ch._recv_queue.put(m) # prime our recieved message here, should be acked by EP layer

return ch
self._fsm.process(self.I_ENTER_ACCEPT)
yield ch
self._fsm.process(self.I_EXIT_ACCEPT)


class SubscriberChannel(ListenChannel):
pass
Expand Down
53 changes: 39 additions & 14 deletions pyon/net/endpoint.py
Expand Up @@ -423,27 +423,52 @@ def listen(self, binding=None):
self._ready_event.set()

while True:
log.debug("LEF: %s blocking, waiting for a message" % str(self._recv_name))
log.debug("LEF: %s blocking, waiting for a message", self._recv_name)
try:
newchan = self._chan.accept()
msg, headers, delivery_tag = newchan.recv()
with self._chan.accept() as newchan:
msg, headers, delivery_tag = newchan.recv()

log.debug("LEF %s received message %s, headers %s, delivery_tag %s", self._recv_name, msg, headers, delivery_tag)
log_message(self._recv_name, msg, headers, delivery_tag)
log.debug("LEF %s received message %s, headers %s, delivery_tag %s", self._recv_name, "-", headers, delivery_tag)
log_message(self._recv_name, msg, headers, delivery_tag)

try:
e = self.create_endpoint(existing_channel=newchan)
e._message_received(msg, headers)
except Exception:
log.exception("Unhandled error while handling received message")
raise
finally:
# ALWAYS ACK
newchan.ack(delivery_tag)

except ChannelClosedError as ex:
log.debug('Channel was closed during LEF.listen')
break

try:
e = self.create_endpoint(existing_channel=newchan)
e._message_received(msg, headers)
except Exception:
log.exception("Unhandled error while handling received message")
raise
finally:
# ALWAYS ACK
newchan.ack(delivery_tag)


# while True:
# log.debug("LEF: %s blocking, waiting for a message", self._recv_name)
# try:
# newchan = self._chan.accept()
# msg, headers, delivery_tag = newchan.recv()
#
# log.debug("LEF %s received message %s, headers %s, delivery_tag %s", self._recv_name, "-", headers, delivery_tag)
# log_message(self._recv_name, msg, headers, delivery_tag)
#
# except ChannelClosedError as ex:
# log.debug('Channel was closed during LEF.listen')
# break
#
# try:
# e = self.create_endpoint(existing_channel=newchan)
# e._message_received(msg, headers)
# except Exception:
# log.exception("Unhandled error while handling received message")
# raise
# finally:
# # ALWAYS ACK
# newchan.ack(delivery_tag)

def close(self):
BaseEndpoint.close(self)
Expand Down
1 change: 0 additions & 1 deletion pyon/net/messaging.py
Expand Up @@ -94,7 +94,6 @@ def on_channel_request_close(self, ch):

assert ch.get_channel_id() in self._pool_map
with self._lock:
ch.stop_consume()
chid = self._pool_map.pop(ch.get_channel_id())
log.debug("Releasing BiDir pool Pika #%d, our id #%d", ch.get_channel_id(), chid)
self._pool.release_id(chid)
Expand Down