Skip to content

Commit

Permalink
Some improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
danielBCN committed Mar 6, 2017
1 parent 49e1ae7 commit 5f75886
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 31 deletions.
15 changes: 11 additions & 4 deletions doc/remote_tuto.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,17 @@ And ``pyactor\examples\Remote\s4_clientb.py``:
.. literalinclude:: ../examples/Remote/s4_clientb.py
:linenos:

Here we have a registry where Servers can be bind. The registry module starts an
actor which is the registry itself. The first client, binds a server to the
registry and waits for queries. The second one, uses the registry to find the
first one and send work to its server.
In this example we have a registry where Servers can be bind. The registry
module starts an actor which is the registry itself to which servers can be
bind and clients look for servers. The first client, binds a server to the
registry and waits for queries to that server. The second one, uses the registry
to find the first one and spawn a server instance on it to afterwards send work
to it.

In order to execute the second client repeatedly without having to restart
all the processes, before spawning the server remotely, it checks if the first
client has already the server by using the method ``has_actor`` on the
remote_host.



Expand Down
12 changes: 11 additions & 1 deletion doc/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,17 @@ blocked with another job (an I/O call or a synchronous call to another actor).
To make one method execute parallel, you need to specify it in the class attribute
_parallel, which is a list. The method must also be in one of the lists _tell or
_ask. The methods with this tag will be executed in new threads so their execution
do not interfere with other queries.
do not interfere with other queries. That is, the actor can attend other queries
while executing the parallel method.

As you could think, executing methods of the same actor at the same time can compromise
the integrity of data. PyActor ensures that only one thread is executing on an
actor at the same time, allowing other threads to execute when the one executing
is blocked with some call. This prevents two threads from accessing the same data
at a time, but is up to the programmer to prevent the data to change during the
execution of a method if that is not intended, as a method could modify a property
of the actor while a parallel, that operates with that data, is blocked, leading
to an inconsistency.

In this example we have three classes: File, Web and Workload. File represents a
server that serve the download of files. Simulates the work with a sleep.
Expand Down
4 changes: 2 additions & 2 deletions examples/Remote/s4_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

if __name__ == "__main__":
set_context()
host = create_host('http://127.0.0.1:1679')
host = create_host('http://127.0.0.1:6001')

registry = host.lookup_url('http://127.0.0.1:1277/regis', 'Registry',
registry = host.lookup_url('http://127.0.0.1:6000/regis', 'Registry',
's4_registry')

registry.bind('host1', host)
Expand Down
21 changes: 12 additions & 9 deletions examples/Remote/s4_clientb.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@ def wait_a_lot(self):

if __name__ == "__main__":
set_context()
host = create_host('http://127.0.0.1:1777')
host = create_host('http://127.0.0.1:6002')

registry = host.lookup_url('http://127.0.0.1:1277/regis', 'Registry',
registry = host.lookup_url('http://127.0.0.1:6000/regis', 'Registry',
's4_registry')
remote_host = registry.lookup('host1')

server = remote_host.spawn('server', 's4_clientb/Server')
z = server.add(6, 7)
print z
server.substract(6, 5)
t = server.add(8, 7)
print t
if remote_host is not None:
if not remote_host.has_actor('server'):
server = remote_host.spawn('server', 's4_clientb/Server')
else:
server = remote_host.lookup('server')
z = server.add(6, 7)
print z
server.substract(6, 5)
t = server.add(8, 7)
print t

try:
registry.unbind('None')
Expand Down
9 changes: 6 additions & 3 deletions examples/Remote/s4_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@ def unbind(self, name):
raise NotFound()

def lookup(self, name):
return self.actors[name]
if name in self.actors:
return self.actors[name]
else:
return None

def get_all(self):
return self.actors.values()


if __name__ == "__main__":
set_context()
host = create_host('http://127.0.0.1:1277/')
host = create_host('http://127.0.0.1:6000/')

registry = host.spawn('regis', Registry)

print 'host listening at port 1277'
print 'host listening at port 6000'

serve_forever()
20 changes: 18 additions & 2 deletions pyactor/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ class Host(object):
:param str. url: URL that identifies the host and where to find it.
'''
_tell = ['attach_interval', 'detach_interval', 'hello', 'stop_actor']
_ask = ['spawn', 'lookup', 'lookup_url', 'say_hello', 'interval', 'later']
_ask = ['spawn', 'lookup', 'lookup_url', 'say_hello', 'interval', 'later',
'has_actor']
_ref = ['spawn', 'lookup', 'lookup_url', 'interval', 'later']

def __init__(self, url):
Expand Down Expand Up @@ -233,6 +234,16 @@ def spawn(self, aid, klass, param=None):
self.launch_actor(url, new_actor)
return Proxy(new_actor)

def has_actor(self, aid):
'''
Checks if the given id is used in the host by some actor.
:param str. aid: identifier of the actor to check.
:return: True if the id is used within the host.
'''
url = '%s://%s/%s' % (self.transport, self.host_url.netloc, aid)
return url in self.actors.keys()

def lookup(self, aid):
'''
Gets a new proxy that references to the actor of this host
Expand Down Expand Up @@ -487,7 +498,12 @@ def loads(self, param):
proxies for actors from another host.
'''
if isinstance(param, ProxyRef):
return self.lookup_url(param.url, param.klass, param.module)
try:
return self.lookup_url(param.url, param.klass, param.module)
except HostError:
print "Can't lookup for the actor received with the call. \
It does not exist or the url is unreachable.", param
raise HostError
elif isinstance(param, list):
return [self.loads(elem) for elem in param]
elif isinstance(param, dict):
Expand Down
6 changes: 3 additions & 3 deletions pyactor/green_thread/rpcactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class RPCDispatcher(Actor):
'''
This is the actor that will manage the sends and recieves of remote
This is the actor that will manage the sends and receives of remote
queries with other hosts. Each host has one, configured depending on
the scheme specified when created.
'''
Expand All @@ -29,7 +29,7 @@ def __init__(self, url, host, mode):
self.source.start()
self.running = True
self.channel = Channel()
self.pending = {} # Sended to another host
self.pending = {} # Sent to another host
self.executing = {} # Waiting for the response in this server
self.tell = ['stop']
self.ask = []
Expand Down Expand Up @@ -68,7 +68,7 @@ def receive(self, msg):
except Exception:
print (('Error sending a response to %r.'
% (self.executing[msg[RPC_ID]])) +
' Receiver is offline?')
' Is the receiver offline?')
del self.executing[msg[RPC_ID]]
elif msg[TYPE] == FUTURE:
rpc_id = msg[RPC_ID]
Expand Down
6 changes: 3 additions & 3 deletions pyactor/thread/rpcactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class RPCDispatcher(Actor):
'''
This is the actor that will manage the sends and recieves of remote
This is the actor that will manage the sends and receives of remote
queries with other hosts. Each host has one, configured depending on
the scheme specified when created.
'''
Expand All @@ -29,7 +29,7 @@ def __init__(self, url, host, mode):
self.source.start()
self.running = True
self.channel = Channel()
self.pending = {} # Sended to another host
self.pending = {} # Sent to another host
self.executing = {} # Waiting for the response in this server
self.tell = ['stop']
self.ask = []
Expand Down Expand Up @@ -68,7 +68,7 @@ def receive(self, msg):
except Exception:
print (('Error sending a response to %r.'
% (self.executing[msg[RPC_ID]])) +
' Receiver is offline?')
' Is the receiver offline?')
del self.executing[msg[RPC_ID]]
elif msg[TYPE] == FUTURE:
rpc_id = msg[RPC_ID]
Expand Down
12 changes: 10 additions & 2 deletions pyactor/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from threading import current_thread


from pyactor.exceptions import HostError


RABBITU = "guest"
RABBITP = "guest"

Expand Down Expand Up @@ -76,8 +79,13 @@ def get_lock():
def ref_l(f):
def wrap_ref_l(*args):
new_args = list(args)
new_args[0][PARAMS] = get_host().loads(list(args[0][PARAMS]))
return f(*new_args)
try:
new_args[0][PARAMS] = get_host().loads(list(args[0][PARAMS]))
return f(*new_args)
except HostError:
pass
# If there is a problem deserializing the params, the method is not
# executed.
return wrap_ref_l


Expand Down
2 changes: 1 addition & 1 deletion testing/tests_gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def test_1hostcreation(self):
self.assertEqual(self.h.actor.tell, ['attach_interval',
'detach_interval',
'hello', 'stop_actor', 'stop'])
self.assertEqual(self.h.actor.ask, ['say_hello'])
self.assertEqual(self.h.actor.ask, ['say_hello', 'has_actor'])
self.assertEqual(self.h.actor.ask_ref, ['spawn', 'interval', 'lookup',
'lookup_url', 'later'])
with self.assertRaises(Exception):
Expand Down
2 changes: 1 addition & 1 deletion testing/tests_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def test_1hostcreation(self):
self.assertEqual(self.h.actor.tell, ['attach_interval',
'detach_interval', 'hello',
'stop_actor', 'stop'])
self.assertEqual(self.h.actor.ask, ['say_hello'])
self.assertEqual(self.h.actor.ask, ['say_hello', 'has_actor'])
self.assertEqual(self.h.actor.ask_ref, ['spawn', 'interval', 'lookup',
'lookup_url', 'later'])
with self.assertRaises(Exception):
Expand Down

0 comments on commit 5f75886

Please sign in to comment.