Permalink
Browse files

Merge branch 'release/3.0.8'

  • Loading branch information...
javipalanca committed Oct 2, 2018
2 parents 7025343 + 45e600f commit afd73ea35dd3d7a1ae123fddaa2c424dc23fd8e6
Showing with 341 additions and 17 deletions.
  1. +8 −0 HISTORY.rst
  2. +9 −0 docs/spade.rst
  3. +91 −0 examples/performance.py
  4. +1 −0 requirements.txt
  5. +1 −0 requirements_dev.txt
  6. +1 −1 setup.cfg
  7. +1 −1 setup.py
  8. +1 −1 spade/__init__.py
  9. +39 −8 spade/agent.py
  10. +8 −2 spade/behaviour.py
  11. +67 −0 spade/container.py
  12. +2 −0 spade/message.py
  13. +2 −2 tests/test_behaviour.py
  14. +106 −0 tests/test_container.py
  15. +4 −2 tests/utils.py
View
@@ -2,6 +2,14 @@
History
=======
3.0.8 (2018-10-02)
------------------
* Added a container mechanism to speedup local sends.
* Added performance example.
* Improved API doc.
* Added container tests.
3.0.7 (2018-09-27)
------------------
View
@@ -20,6 +20,14 @@ spade.behaviour module
:undoc-members:
:show-inheritance:
spade.container module
----------------------
.. automodule:: spade.container
:members:
:undoc-members:
:show-inheritance:
spade.message module
--------------------
@@ -59,3 +67,4 @@ spade.web module
:members:
:undoc-members:
:show-inheritance:
View
@@ -0,0 +1,91 @@
import getpass
import time
from sys import getsizeof
from spade.agent import Agent
from spade.behaviour import OneShotBehaviour
from spade.message import Message
class RecvBehav(OneShotBehaviour):
async def run(self):
self.agent.n = 0
while self.agent.n < self.agent.nmax:
await self.receive(timeout=1000)
self.agent.n += 1
class SendBehav(OneShotBehaviour):
async def run(self):
msg = Message(to=self.agent.receiver_jid, body=self.agent.body)
for _ in range(self.agent.nmax):
await self.send(msg)
class Receiver(Agent):
def setup(self):
self.add_behaviour(RecvBehav())
class Sender(Agent):
def setup(self):
self.add_behaviour(SendBehav())
def run_experiment(credentials, use_container=True, num_msg=1000, body="0"):
sender_jid = credentials["sender_jid"]
sender_passwd = credentials["sender_passwd"]
recv_jid = credentials["recv_jid"]
recv_passwd = credentials["recv_passwd"]
receiver = Receiver(recv_jid, recv_passwd, use_container=use_container)
sender = Sender(sender_jid, sender_passwd, use_container=use_container)
sender.receiver_jid = recv_jid
receiver.n = 0
receiver.nmax = num_msg
sender.nmax = num_msg
sender.body = body
receiver.start(auto_register=True)
sender.start()
print("Go")
t1 = time.time()
while receiver.n < num_msg:
time.sleep(0.1)
t2 = time.time()
size = getsizeof(body)
if use_container:
print("{} Messages of size {} bytes received w/container: {}".format(receiver.n, size, t2 - t1))
else:
print("{} Messages of size {} bytes received wo/container: {}".format(receiver.n, size, t2 - t1))
sender.stop()
receiver.stop()
if __name__ == "__main__":
agent_credentials = {
"sender_jid": input("SenderAgent JID> "),
"sender_passwd": getpass.getpass(),
"recv_jid": input("ReceiverAgent JID> "),
"recv_passwd": getpass.getpass()
}
run_experiment(agent_credentials, use_container=True)
run_experiment(agent_credentials, use_container=False)
run_experiment(agent_credentials, use_container=True, num_msg=10000)
run_experiment(agent_credentials, use_container=False, num_msg=10000)
run_experiment(agent_credentials, use_container=True, body="0" * 1000)
run_experiment(agent_credentials, use_container=False, body="0" * 1000)
run_experiment(agent_credentials, use_container=True, num_msg=10000, body="0" * 1000)
run_experiment(agent_credentials, use_container=False, num_msg=10000, body="0" * 1000)
run_experiment(agent_credentials, use_container=True, num_msg=10000, body="0" * 100000)
run_experiment(agent_credentials, use_container=True, num_msg=100000, body="0" * 100000)
View
@@ -5,3 +5,4 @@ aiohttp_jinja2>=0.14.0
jinja2>=2.9.6
jinja2-time==0.2.0
timeago==1.0.8
singletonify==0.2.3
View
@@ -23,3 +23,4 @@ docutils==0.12
sphinx_rtd_theme==0.4.0
testfixtures==6.3.0
pytest-aiohttp==0.3.0
docutils==0.12
View
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 3.0.7
current_version = 3.0.8
commit = True
tag = False
View
@@ -27,7 +27,7 @@ def parse_requirements(filename):
setup(
name='spade',
version='3.0.7',
version='3.0.8',
description="Smart Python Agent Development Environment",
long_description=readme + '\n\n' + history,
author="Javi Palanca",
View
@@ -7,6 +7,6 @@
__author__ = """Javi Palanca"""
__email__ = 'jpalanca@gmail.com'
__version__ = '3.0.7'
__version__ = '3.0.8'
__all__ = ["agent", "behaviour", "message", "template"]
View
@@ -10,6 +10,7 @@
import aioxmpp.ibr as ibr
from aioxmpp.dispatcher import SimpleMessageDispatcher
from spade.container import Container
from spade.message import Message
from spade.presence import PresenceManager
from spade.trace import TraceStore
@@ -24,7 +25,7 @@ class AuthenticationFailure(Exception):
class Agent(object):
def __init__(self, jid, password, verify_security=False, loop=None):
def __init__(self, jid, password, verify_security=False, use_container=True, loop=None):
"""
Creates an agent
@@ -41,6 +42,12 @@ def __init__(self, jid, password, verify_security=False, loop=None):
self.behaviours = []
self._values = {}
if use_container:
self.container = Container()
self.container.register(self)
else:
self.container = None
self.traces = TraceStore(size=1000)
if loop:
@@ -63,6 +70,15 @@ def __init__(self, jid, password, verify_security=False, loop=None):
# Web service
self.web = WebApp(agent=self)
def set_container(self, container):
"""
Sets the container to which the agent is attached
Args:
container (spade.container.Container): the container to be attached to
"""
self.container = container
def start(self, auto_register=True):
"""
Starts the agent. This fires some actions:
@@ -104,6 +120,7 @@ def start(self, auto_register=True):
self._start()
def _start(self):
""" Finish the start process."""
self.aiothread.start()
self._alive.set()
# register a message callback here
@@ -292,8 +309,8 @@ def _message_received(self, msg):
"""
Callback run when an XMPP Message is reveived.
This callback delivers the message to every behaviour
that is waiting for it using their templates match.
the aioxmpp.Message is converted to spade.message.Message
that is waiting for it. First, the aioxmpp.Message is
converted to spade.message.Message
Args:
msg (aioxmpp.Messagge): the message just received.
@@ -302,9 +319,23 @@ def _message_received(self, msg):
list(asyncio.Future): a list of futures of the append of the message at each matched behaviour.
"""
logger.debug(f"Got message: {msg}")
msg = Message.from_node(msg)
return self.dispatch(msg)
def dispatch(self, msg):
"""
Dispatch the message to every behaviour that is waiting for
it using their templates match.
Args:
msg (spade.message.Messagge): the message to dispatch.
Returns:
list(asyncio.Future): a list of futures of the append of the message at each matched behaviour.
"""
logger.debug(f"Got message: {msg}")
futures = []
matched = False
for behaviour in (x for x in self.behaviours if x.match(msg)):
@@ -340,7 +371,7 @@ def __init__(self, agent, loop, *args, **kwargs):
logger=logging.getLogger(agent.jid.localpart))
def connect(self): # pragma: no cover
""" """
""" connect and authenticate to the XMPP server. Sync mode. """
try:
self.conn_coro = self.client.connected()
aenter = type(self.conn_coro).__aenter__(self.conn_coro)
@@ -351,7 +382,7 @@ def connect(self): # pragma: no cover
"Could not authenticate the agent. Check user and password or use auto_register=True")
async def async_connect(self): # pragma: no cover
""" """
""" connect and authenticate to the XMPP server. Async mode. """
try:
self.conn_coro = self.client.connected()
aenter = type(self.conn_coro).__aenter__(self.conn_coro)
@@ -362,15 +393,15 @@ def connect(self): # pragma: no cover
"Could not authenticate the agent. Check user and password or use auto_register=True")
def run(self):
""" """
""" run event loop """
if not self.agent.external_loop:
self.loop_exited.set()
self.loop.run_forever()
logger.debug("Loop stopped.")
self.loop_exited.clear()
def finalize(self):
""" """
""" Discconnect from XMPP server and close the event loop. """
if self.agent.is_alive():
# Disconnect from XMPP server
self.client.stop()
View
@@ -257,11 +257,17 @@ def mailbox_size(self):
if not msg.sender:
msg.sender = str(self.agent.jid)
logger.debug(f"Adding agent's jid as sender to message: {msg}")
aioxmpp_msg = msg.prepare()
await self.agent.client.send(aioxmpp_msg)
if self.agent.container:
await self.agent.container.send(msg, self)
else:
await self._xmpp_send(msg)
msg.sent = True
self.agent.traces.append(msg, category=str(self))
async def _xmpp_send(self, msg):
aioxmpp_msg = msg.prepare()
await self.agent.client.send(aioxmpp_msg)
async def receive(self, timeout=None):
"""
Receives a message for this behaviour.
View
@@ -0,0 +1,67 @@
from singletonify import singleton
@singleton()
class Container(object):
"""
The container class allows agents to exchange messages
without using the XMPP socket if they are in the same
process.
The container is a singleton.
"""
def __init__(self):
self.__agents = {}
def reset(self):
""" Empty the container by unregistering all the agents. """
self.__agents = {}
def register(self, agent):
"""
Register a new agent.
Args:
agent (spade.agent.Agent): the agent to be registered
"""
self.__agents[str(agent.jid)] = agent
agent.set_container(self)
def has_agent(self, jid):
"""
Check if an agent is registered in the container.
Args:
jid (str): the jid of the agent to be checked.
Returns:
bool: wether the agent is or is not registered.
"""
return jid in self.__agents
def get_agent(self, jid):
"""
Returns a registered agent
Args:
jid (str): the identifier of the agent
Returns:
spade.agent.Agent: the agent you were looking for
Raises:
KeyError: if the agent is not found
"""
return self.__agents[jid]
async def send(self, msg, behaviour):
"""
This method sends the message using the container mechanism
when the receiver is also registered in the container. Otherwise,
it uses the XMPP send method from the original behaviour.
Args:
msg (spade.message.Message): the message to be sent
behaviour: the behaviour that is sending the message
"""
to = str(msg.to)
if to in self.__agents:
self.__agents[to].dispatch(msg)
else:
await behaviour._xmpp_send(msg)
View
@@ -106,6 +106,7 @@ def sender(self, jid):
def thread(self):
"""
Get Thread of the message
Returns:
str: thread id
"""
@@ -115,6 +116,7 @@ def thread(self):
def thread(self, value):
"""
Set thread id of the message
Args:
value (str): the thread id
Oops, something went wrong.

0 comments on commit afd73ea

Please sign in to comment.