# An example of the Active Object pattern in Python

See http://code.activestate.com/recipes/365292-active-objects/

<img src ="AOP.gif">

In [5]:
"""
An ActiveObject forward messages to an internal passive object
running on its own thread.

The passive object processes these messages sequentially, and returns the results
or any exceptions to the caller via an AsyncResult object.
"""

from threading import Thread, Event, RLock
from Queue import Queue

class AsyncResult:
    """Represents an asynchronous operation that may not have completed yet."""
    def __init__(self):
        self.completed = False
        self.failed = False
        self.__wait = Event()
        self.__callbacks = []
        self.__errbacks = []
        self.__retval = None
        self.__error = None
        self.__lock = RLock()

    def complete(self):
        self.__lock.acquire()
        self.completed = True
        self.__wait.set()
        self.__lock.release()

    def succeed(self, retval):
        self.__retval = retval
        self.complete()
        for callback in self.__callbacks:
            callback(retval)
        self.clearCallbacks()

    def fail(self, error):
        self.__error = error
        self.failed = True
        self.complete()
        for errback in self.__errbacks:
            errback(error)
        self.clearCallbacks()

    def clearCallbacks(self):
        self.__callbacks = []
        self.__errbacks = []

    def addCallback(self, callback, errback=None):
        self.__lock.acquire()
        try:
            if self.completed:
                if not self.failed:
                    callback(self.retval)
            else:
                self.__callbacks.append(callback)
            if not errback == None:
                self.addErrback(errback)
        finally:
            self.__lock.release()

    def addErrback(self, errback):
        self.__lock.acquire()
        try:
            if self.completed:
                if self.failed:
                    errback(self.error)
            else:
                self.__errbacks.append(errback)
        finally:
            self.__lock.release()

    def __getResult(self):
        self.__wait.wait()
        if not self.failed:
            return self.__retval
        else:
            raise self.__error
    result=property(__getResult)


       
class Message:
    """Represents a message forwarded to a passive object by an active object"""
    def __init__(self, fun, queue):
        self.fun = fun
        self.queue = queue
        
    def __call__(self, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        self.result = AsyncResult()
        self.queue.put(self)
        return self.result

    def call(self):
        return self.fun(*(self.args), **(self.kwargs))

class ActiveObject:
    """An object that handles messages sequentially on a separate thread.
    Call stop() to terminate the object's internal message loop."""
    def __init__(self, klass, *args, **kwargs):
        self.__obj = klass(*args, **kwargs)
        self.__queue = Queue()
        self.__thread = Thread(target=self.__processQueue)
        self.__thread.start()
        self.stopped = False

    def stop(self):
        self.__queue.put(StopIteration)
        
    def __processQueue(self):
        while True:
            message = self.__queue.get()
            retval = None
            failure = None
            if message==StopIteration:
                self.stopped = True
                break
            try:
                retval = message.call()
            except Exception, e:
                failure = e
            if failure==None:
                message.result.succeed(retval)
            else:
                message.result.fail(failure)

    def __getattr__(self, attrname):
        if self.stopped:
            raise AttributeError("Object is no longer active.")
        fun = getattr(self.__obj, attrname)
        if hasattr(fun, '__call__'):
            return Message(getattr(self.__obj, attrname), self.__queue)
        else:
            raise AttributeError("Active object does not support this function.")
            
            
class Server:
    def echo(self,msg):
        return 'returning:'+msg
def on_success(result):
    print result
def on_fail(error):
    print error
    
active = ActiveObject(Server)
#asyncResult = active.echo('hola !') 
#asyncResult = myActiveObject.doFoo() 
#while not asyncResult.completed: 
#    pass # do something else until the result is available print asyncResult.result

print active.echo('pedro').result # waits for a result and returns it when available

active.echo('pedro now with callbacks').addCallback(on_success,on_fail)

#myActiveObject.doBar().addCallback(callback, errback) # calls callback if the operation succeeded, and errback if it failed.


active.stop() # explicitly terminates the message loop used by the active object.

returning:pedro
returning:pedro now with callbacks


# Simple Actor model

In [7]:
from Queue import Queue
from threading import Thread
class Channel(Queue):
    def __init__(self):
        Queue.__init__(self)
    def send(self,msg):
        self.put(msg)
    def receive(self, timeout = None):
        return self.get(timeout=timeout)
    
class Actor:
    def __init__(self, klass, *args, **kwargs):
        self.channel = Channel()
        self.__obj = klass(*args, **kwargs)
        self.running = True
        
    def __processQueue(self):
        while self.running:
            message = self.channel.receive()
            self.receive(message)
        print 'stop ...'
    
    def is_alive(self):
        return self.running
            
    def receive(self,msg):
        ''' receive messages and invokes object method'''
        
        if msg[0]=='stop':
                self.running = False
                
        else:
            result = None
            try:
                invoke = getattr(self.__obj, msg[0])
                params = msg[1]
                result = invoke(*params)
            except Exception, e:
                print e
        
    def get_proxy(self):
        return self.channel

    def run(self):
        self.thread = Thread(target=self.__processQueue)
        self.thread.start()
        #threads[self.thread] = self.aref
      
class Proxy:
    def __init__(self, channel):
        self.__channel = channel

    def __getattr__(self, methodname):
        self.method = methodname
        return self
            
    def __call__(self, *args, **kwargs):
        #return self.__send(*args, **kwargs)
        msg = (self.method,args)
        self.__channel.send(msg)
        

class Echo:
    def echo(self,msg):
        print msg
    def bye(self):
        print 'bye'
        
actor1 = Actor(Echo)
actor1.run()
p = Proxy(actor1.get_proxy())
p.echo('hola amigo !!')
p.kk()
p.bye()
p.stop()

hola amigo !!
Echo instance has no attribute 'kk'
bye
stop ...


Now,  the proxy is intercepting all calls and trying to invoke them in the actor even if they are not defined.
Let's improve the Actor class so that one-way asynchronous methods are provided explicitly with set_attr.

In [23]:
from Queue import Queue
from threading import Thread

METHOD = 0
PARAMS = 1

class Channel(Queue):
    def __init__(self):
        Queue.__init__(self)
    def send(self,msg):
        self.put(msg)
    def receive(self, timeout = None):
        return self.get(timeout=timeout)
    
class Actor:
    def __init__(self, klass, *args, **kwargs):
        self.__channel = Channel()
        self.__obj = klass(*args, **kwargs)
        self.__async = klass._async
        self.running = True
        
    def __processQueue(self):
        while self.running:
            message = self.__channel.receive()
            self.receive(message)
        print 'stop ...'
    
    def is_alive(self):
        return self.running
            
    def receive(self,msg):
        ''' receive messages and invokes object method'''
        
        if msg[0]=='stop':
                self.running = False
                
        else:
            result = None
            try:
                invoke = getattr(self.__obj, msg[METHOD])
                params = msg[PARAMS]
                result = invoke(*params)
            except Exception, e:
                print e
        
    def get_proxy(self):
        return self.__channel,self.__async

    def run(self):
        self.thread = Thread(target=self.__processQueue)
        self.thread.start()
        #threads[self.thread] = self.aref
      
class Proxy:
    def __init__(self, (channel,methods)):
        self.__channel = channel
        methods.append('stop')
        for method in methods:
            setattr(self, method, TellWrapper(self.__channel,method))
                 
class TellWrapper():
    def __init__(self, channel, method):
        self.__channel = channel
        self.__method = method   
                 
    def __call__(self, *args, **kwargs):
        msg = (self.__method,args)
        self.__channel.send(msg)
        

class Echo:
    _async =['echo','bye']
    def echo(self,msg):
        print msg
    def bye(self):
        print 'bye'
        
actor1 = Actor(Echo)
actor1.run()
p = Proxy(actor1.get_proxy())
p.echo('hola amigo !!')
#p.kk()
p.bye()
p.stop()

hola amigo !!
bye
stop ...


Next step is creating actors in a unified way with a spawn method. Developers should not create actors like normal classes. Let's create a Host class that spawns Actors:

In [5]:
from Queue import Queue
from threading import Thread

METHOD = 0
PARAMS = 1

class Channel(Queue):
    def __init__(self):
        Queue.__init__(self)
    def send(self,msg):
        self.put(msg)
    def receive(self, timeout = None):
        return self.get(timeout=timeout)
    
class Actor(object):
    def __init__(self, klass, *args, **kwargs):
        self.__channel = Channel()
        self.__obj = klass(*args, **kwargs)
        self.__async = klass._async
        self.running = True
        
    def __processQueue(self):
        while self.running:
            message = self.__channel.receive()
            self.receive(message)
        print 'stop ...'
    
    def is_alive(self):
        return self.running
            
    def receive(self,msg):
        ''' receive messages and invokes object method'''
        
        if msg[0]=='stop':
                self.running = False
                
        else:
            result = None
            try:
                invoke = getattr(self.__obj, msg[METHOD])
                params = msg[PARAMS]
                result = invoke(*params)
            except Exception, e:
                print e
        
    def get_proxy(self):
        return self.__channel,self.__async

    def run(self):
        self.thread = Thread(target=self.__processQueue)
        self.thread.start()
        #threads[self.thread] = self.aref
        
          
class Proxy:
    def __init__(self, (channel,methods)):
        self.__channel = channel
        methods.append('stop')
        for method in methods:
            setattr(self, method, TellWrapper(self.__channel,method))
                 
class TellWrapper():
    def __init__(self, channel, method):
        self.__channel = channel
        self.__method = method   
                 
    def __call__(self, *args, **kwargs):
        msg = (self.__method,args)
        self.__channel.send(msg)

class AlreadyExists(Exception): pass
class NotFound(Exception):pass
        
class Host(object):
    def __init__(self):
        self.actors = {}
        
    def spawn(self,id,kclass):
        new_actor = Actor(kclass)
        if self.actors.has_key(id):
            raise AlreadyExists()
        else:
            self.actors[id] = new_actor
            new_actor.run()
            return Proxy(new_actor.get_proxy())
    
    def lookup(self,id):
        if self.actors.has_key(id):
            return Proxy(self.actors[id].get_proxy())
        else: 
            raise NotFound()
          
  

class Echo:
    _async =['echo','bye']
    def echo(self,msg):
        print msg
    def bye(self):
        print 'bye'
        

h = Host()
e1 = h.spawn('echo1',Echo)
e1.echo('hola amigo !!')
#p.kk()
e1.bye()
e1.stop()

hola amigo !!
bye
stop ...


Lets's improve encapsulation in the proxy

In [7]:
from Queue import Queue
from threading import Thread

METHOD = 0
PARAMS = 1

class Channel(Queue):
    def __init__(self):
        Queue.__init__(self)
    def send(self,msg):
        self.put(msg)
    def receive(self, timeout = None):
        return self.get(timeout=timeout)
    
class Actor(object):
    def __init__(self, url, klass, *args, **kwargs):
        self.url = url
        self.channel = Channel()
        self.__obj = klass(*args, **kwargs)
        self.__async = klass._async
        self.running = True
        
    def __processQueue(self):
        while self.running:
            message = self.channel.receive()
            self.receive(message)
        print 'stop ...'
    
    def methods(self):
        return self.__async
    
    def is_alive(self):
        return self.running
            
    def receive(self,msg):
        ''' receive messages and invokes object method'''
        
        if msg[0]=='stop':
                self.running = False
                
        else:
            result = None
            try:
                invoke = getattr(self.__obj, msg[METHOD])
                params = msg[PARAMS]
                result = invoke(*params)
            except Exception, e:
                print e
        
    def get_proxy(self):
        return self.__channel,self.__async

    def run(self):
        self.thread = Thread(target=self.__processQueue)
        self.thread.start()
        #threads[self.thread] = self.aref
        
          
class Proxy:
    def __init__(self, actor):
        self.__channel = actor.channel
        methods = actor.methods()
        methods.append('stop')
        for method in methods:
            setattr(self, method, TellWrapper(self.__channel,method))
                 
class TellWrapper():
    def __init__(self, channel, method):
        self.__channel = channel
        self.__method = method   
                 
    def __call__(self, *args, **kwargs):
        msg = (self.__method,args)
        self.__channel.send(msg)

class AlreadyExists(Exception): pass
class NotFound(Exception):pass
        
class Host(object):
    def __init__(self,name):
        self.name = name
        self.actors = {}
        self.threads = {}
        
    def spawn(self,id,kclass):
        url = 'local://name/'+id
        new_actor = Actor(url,kclass)
        if self.actors.has_key(id):
            raise AlreadyExists()
        else:
            new_actor.run()
            self.actors[id] = new_actor
            self.threads[id] = new_actor.thread
            return Proxy(new_actor)
    
    def lookup(self,id):
        if self.actors.has_key(id):
            return Proxy(self.actors[id])
        else: 
            raise NotFound()
          
  

class Echo:
    _async =['echo','bye']
    def echo(self,msg):
        print msg
    def bye(self):
        print 'bye'
        

h = Host('pedro')
e1 = h.spawn('echo1',Echo)
e1.echo('hola amigo !!')
#p.kk()
e1.bye()
e1.stop()

hola amigo !!
bye
stop ...


OK, let's add now Futures.

In [34]:
from Queue import Queue
from threading import Thread

METHOD = 0
PARAMS = 1
FUTURE = 2

class Channel(Queue):
    def __init__(self):
        Queue.__init__(self)
    def send(self,msg):
        self.put(msg)
    def receive(self, timeout = None):
        return self.get(timeout=timeout)
    
class Actor(object):
    def __init__(self, url, klass, *args, **kwargs):
        self.url = url
        self.channel = Channel()
        self.__obj = klass(*args, **kwargs)
        self.tell = klass._tell
        self.ask = klass._ask
        self.tell.append('stop')
        self.running = True
        
    def __processQueue(self):
        while self.running:
            message = self.channel.receive()
            self.receive(message)
        print 'stop ...'
    
    
    def is_alive(self):
        return self.running
            
    def receive(self,msg):
        ''' receive messages and invokes object method'''
        if msg[0]=='stop':
                self.running = False
        else:
            result = None
            try:
                invoke = getattr(self.__obj, msg[METHOD])
                params = msg[PARAMS]
                result = invoke(*params)
                
            except Exception, e:
                result = e
                print result
            if result != None:
                msg[FUTURE].send(result)
        
   # def get_proxy(self):
   #     return self.__channel,self.__async

    def run(self):
        self.thread = Thread(target=self.__processQueue)
        self.thread.start()
        #threads[self.thread] = self.aref
        
          
class Proxy:
    def __init__(self, actor):
        self.__channel = actor.channel
        for method in actor.tell:
            setattr(self, method, TellWrapper(self.__channel,method))
        for method in actor.ask:
            setattr(self, method, AskWrapper(self.__channel,method))
   
class Future(object):
    def __init__(self,actor_channel,method,params):
        self.channel = Channel()
        self.method = method
        self.params = params
        self.actor_channel = actor_channel
    
    def get(self):
        msg = (self.method,self.params,self.channel)
        self.actor_channel.send(msg)
        result = self.channel.receive()
        if isinstance(result, Exception):
            raise result
        else:
            return result
        
        
class TellWrapper:
    def __init__(self, channel, method):
        self.__channel = channel
        self.__method = method   
                 
    def __call__(self, *args, **kwargs):
        msg = (self.__method,args)
        self.__channel.send(msg)
        
class AskWrapper:
    def __init__(self, channel, method):
        self.__channel = channel
        self.__method = method   
                 
    def __call__(self, *args, **kwargs):
        return Future(self.__channel,self.__method,args)

class AlreadyExists(Exception): pass
class NotFound(Exception):pass
        
class Host(object):
    def __init__(self,name):
        self.name = name
        self.actors = {}
        self.threads = {}
        
    def spawn(self,id,kclass):
        url = 'local://name/'+id
        new_actor = Actor(url,kclass)
        if self.actors.has_key(id):
            raise AlreadyExists()
        else:
            new_actor.run()
            self.actors[id] = new_actor
            self.threads[id] = new_actor.thread
            return Proxy(new_actor)
    
    def lookup(self,id):
        if self.actors.has_key(id):
            return Proxy(self.actors[id])
        else: 
            raise NotFound()
          
  

class Echo:
    _tell =['echo','bye']
    _ask = ['say_something']
    def echo(self,msg):
        print msg
    def bye(self):
        print 'bye'
    def say_something(self):
        return 'something'
        

h = Host('pedro')
e1 = h.spawn('echo1',Echo)
e1.echo('hola amigo !!')
e1.bye()

future  = e1.say_something().get()
print future

e1.stop()

        
    

hola amigo !!
bye
something
stop ...


Let's try now to enable pass by reference in actors. Note that when a proxy is passed as parameter, it must be serialized
and restored in the other end. Note that we must design our system to work across machines.

In [None]:
class Ref: 
    url = ''
    methods = []
    def __init__(self,url,methods):
        self.url = url
        self.methods = methods

def dumps(self, param):
    if isinstance(param, Ref):
        return Ref(param.url,param,methods())
    elif isinstance(param, list):
        return [dumps(elem) for elem in param]
    else:
        return param


def loads(self, param):
    if isinstance(param, Ref):
        #_from = controller.get_current()
        if _from == param.get_aref():
            aurl = urlparse(_from)
            obj = Auto_Proxy(self.objects[aurl.path].obj, _from)
        else:
            obj = self._lookup(param.get_aref(), _from)

        return obj
    elif isinstance(param, list):
        return [self._loads(elem) for elem in param]
    else:
        return param