Permalink
Browse files

Split ActorProxy into ActorProxy and ActorRef. Lots of refactoring. A…

…ll tests green.
  • Loading branch information...
1 parent 2e75935 commit 34602253fefcc96fcf6afcdc97c1351325315601 @jodal committed Feb 13, 2011
View
@@ -1,17 +1,39 @@
+import gevent.monkey
+gevent.monkey.patch_all()
+
from pykka.actor import Actor
-from pykka.future import Future, get_all, wait_all
from pykka.proxy import ActorProxy, CallableProxy
+from pykka.ref import ActorRef
from pykka.registry import ActorRegistry
-__all__ = ['Actor', 'ActorProxy', 'ActorRegistry', 'CallableProxy', 'Future',
+__all__ = ['Actor', 'ActorProxy', 'ActorRef', 'ActorRegistry', 'CallableProxy',
'get_all', 'wait_all']
VERSION = (0, 5)
def get_version():
+ """Returns a formatted version number"""
version = '%s.%s' % (VERSION[0], VERSION[1])
if len(VERSION) > 2:
version = '%s.%s' % (version, VERSION[2])
return version
+
+def get_all(results, timeout=None):
+ """
+ Get all values encapsulated in the list of
+ :class:`gevent.event.AsyncResult`.
+
+ :attr:`timeout` has the same behaviour as for :meth:`Future.get`.
+ """
+ return map(lambda result: result.get(timeout=timeout), results)
+
+def wait_all(results, timeout=None):
+ """
+ Block until all :class:`gevent.event.AsyncResult` in the list are avaiable.
+
+ An alias for :func:`get_all`, but with a name that is more describing if
+ you do not care about the return values.
+ """
+ return get_all(results, timeout)
View
@@ -1,10 +1,9 @@
import gevent
import gevent.queue
import logging
-import sys
import uuid
-from pykka.proxy import ActorProxy
+from pykka.ref import ActorRef
from pykka.registry import ActorRegistry
logger = logging.getLogger('pykka')
@@ -46,6 +45,11 @@ class Actor(gevent.Greenlet):
To stop an actor, call :meth:`Actor.stop()`.
"""
+ actor_urn = None
+ actor_inbox = None
+ actor_ref = None
+ actor_runnable = True
+
@classmethod
def start(cls, *args, **kwargs):
"""
@@ -54,8 +58,8 @@ def start(cls, *args, **kwargs):
Any arguments passed to :meth:`start` will be passed on to the class
constructor.
- Returns a :class:`ActorProxy` which can be used to access the actor in
- a safe manner.
+ Returns a :class:`ActorRef` which can be used to access the actor in a
+ safe manner.
Behind the scenes, the following is happening when you call
:meth:`start`::
@@ -66,73 +70,72 @@ def start(cls, *args, **kwargs):
Greenlet.__init__()
UUID assignment
Inbox creation
- Proxy creation
+ ActorRef creation
Actor.__init__() # Your code can run here
Greenlet.start()
ActorRegistry.register()
"""
obj = cls(*args, **kwargs)
- super(Actor, obj).start()
+ gevent.Greenlet.start(obj)
logger.debug(u'Started %s', obj)
- ActorRegistry.register(obj._actor_proxy)
- return obj._actor_proxy
+ ActorRegistry.register(obj.actor_ref)
+ return obj.actor_ref
def __new__(cls, *args, **kwargs):
- obj = super(Actor, cls).__new__(cls, *args, **kwargs)
- super(Actor, obj).__init__()
- obj._actor_urn = uuid.uuid4().urn
- obj._actor_inbox = gevent.queue.Queue()
- obj._actor_proxy = ActorProxy(obj)
+ obj = gevent.Greenlet.__new__(cls, *args, **kwargs)
+ gevent.Greenlet.__init__(obj)
+ obj.actor_urn = uuid.uuid4().urn
+ obj.actor_inbox = gevent.queue.Queue()
+ obj.actor_ref = ActorRef(obj)
return obj
+ # pylint: disable=W0231
def __init__(self):
"""
Your are free to override :meth:`__init__` and do any setup you need to
do. You should not call ``super(YourClass, self).__init__(...)``, as
that has already been done when your constructor is called.
When :meth:`__init__` is called, the internal fields
- :attr:`_actor_urn`, :attr:`_actor_inbox`, and :attr:`_actor_proxy` are
+ :attr:`actor_urn`, :attr:`actor_inbox`, and :attr:`actor_ref` are
already set, but the actor is not started or registered in
:class:`ActorRegistry`.
"""
pass
+ # pylint: enable=W0231
def __str__(self):
return '%(class)s (%(urn)s)' % {
'class': self.__class__.__name__,
- 'urn': self._actor_urn,
+ 'urn': self.actor_urn,
}
def stop(self):
"""
- Stop the actor and terminate its thread.
+ Stop the actor.
The actor will finish processing any messages already in its queue
- before stopping.
+ before stopping. It may not be restarted.
"""
- self.runnable = False
- ActorRegistry.unregister(self._actor_proxy)
+ self.actor_runnable = False
+ ActorRegistry.unregister(self.actor_ref)
logger.debug(u'Stopped %s', self)
def _run(self):
- self.runnable = True
- try:
- while self.runnable:
- self._event_loop()
- except KeyboardInterrupt:
- sys.exit()
-
- def _event_loop(self):
- """The actor's event loop which is called continously to handle
- incoming messages, one at the time."""
- message = self._actor_inbox.get()
- response = self._react(message)
- if 'reply_to' in message:
- message['reply_to'].set(response)
+ """The Greenlet main method"""
+ self.actor_runnable = True
+ while self.actor_runnable:
+ message = self.actor_inbox.get()
+ response = self._react(message)
+ if 'reply_to' in message:
+ message['reply_to'].set(response)
def _react(self, message):
"""Reacts to messages sent to the actor."""
+ if message.get('command') == 'get_attributes':
+ return self._get_attributes()
+ if message.get('command') == 'stop':
+ return self.stop()
if message.get('command') == 'call':
return getattr(self, message['attribute'])(
*message['args'], **message['kwargs'])
@@ -146,11 +149,19 @@ def react(self, message):
"""May be implemented for the actor to handle custom messages."""
raise NotImplementedError
- def get_attributes(self):
- """Returns a dict where the keys are all the available attributes and
- the value is whether the attribute is callable."""
+ def _is_exposable_attribute(self, attr):
+ """
+ Returns true for any attribute name that may be exposed through
+ :class:`ActorProxy`.
+ """
+ return not attr.startswith('_')
+
+ def _get_attributes(self):
+ """Gathers attribute information needed by :class:`ActorProxy`."""
result = {}
for attr in dir(self):
- if not attr.startswith('_'):
- result[attr] = callable(getattr(self, attr))
+ if self._is_exposable_attribute(attr):
+ result[attr] = {
+ 'callable': callable(getattr(self, attr)),
+ }
return result
View
@@ -1,59 +0,0 @@
-import gevent
-
-class Future(object):
- """
- A :class:`Future` is a handle to a value which will be available in the
- future.
-
- Typically returned by calls to actor methods or accesses to actor fields.
-
- To get hold of the encapsulated value, call :meth:`Future.get()`.
- """
- def __init__(self, connection):
- self.connection = connection
-
- def __str__(self):
- return str(self.get())
-
- def get(self, block=True, timeout=None):
- """
- Get the value encapsulated by the future.
-
- If *block* is :class:`True`, it will block until the value is available
- or the *timeout* in seconds is reached.
-
- If *block* is :class:`False` it will immediately return the value if
- available or None if not.
- """
- try:
- return self.connection.get(block, timeout)
- except gevent.Timeout:
- return None
-
- def wait(self, timeout=None):
- """
- Block until the future is available.
-
- An alias for :meth:`get`, but with a name that is more describing if
- you do not care about the return value.
- """
- return self.get(timeout=timeout)
-
-
-def get_all(futures, timeout=None):
- """
- Get all the values encapsulated by the given futures.
-
- :attr:`timeout` has the same behaviour as for :meth:`Future.get`.
- """
- return map(lambda future: future.wait(timeout), futures)
-
-
-def wait_all(futures, timeout=None):
- """
- Block until all the given features are available.
-
- An alias for :func:`get_all`, but with a name that is more describing if
- you do not care about the return values.
- """
- return get_all(futures, timeout)
Oops, something went wrong.

0 comments on commit 3460225

Please sign in to comment.