Skip to content
Permalink
Browse files

Tasks are now cancelled in a cleaner way.

  • Loading branch information...
javipalanca committed Jul 17, 2019
1 parent b7bd039 commit 3107418d7523ad35a66506a3ac046672b3e28aba
Showing with 53 additions and 6 deletions.
  1. +10 −2 spade/behaviour.py
  2. +16 −2 spade/container.py
  3. +1 −1 spade/web.py
  4. +26 −1 tests/test_container.py
@@ -1,6 +1,8 @@
import collections
import logging
import traceback
from abc import ABCMeta, abstractmethod
from asyncio import CancelledError
from threading import Event
from datetime import timedelta, datetime
import time
@@ -252,15 +254,21 @@ def join(self, timeout=None):
checks whether behaviour is done or killed,
ortherwise it calls run() coroutine.
"""
cancelled = False
while not self._done() and not self.is_killed():
try:
await self._run()
await asyncio.sleep(0) # relinquish cpu
except CancelledError:
logger.info("Behaviour {} cancelled".format(self))
cancelled = True
except Exception as e:
logger.error("Exception running behaviour {}: {}".format(self, e))
logger.error("Exception running behaviour {behav}: {exc}".format(behav=self, exc=e))
logger.error(traceback.format_exc())
self.kill(exit_code=e)
try:
await self.on_end()
if not cancelled:
await self.on_end()
except Exception as e:
logger.error("Exception running on_end in behaviour {}: {}".format(self, e))
self.kill(exit_code=e)
@@ -1,5 +1,7 @@
import asyncio
import logging
import sys
from contextlib import suppress
from threading import Thread

from singletonify import singleton
@@ -122,17 +124,29 @@ def stop(self):
class AioThread(Thread):
def __init__(self, *args, **kwargs):
self.loop = asyncio.new_event_loop()
self.running = True
super().__init__(*args, **kwargs)

def run(self):
try:
self.loop.run_forever()
if sys.version_info >= (3, 7):
tasks = asyncio.all_tasks(loop=self.loop)
else:
tasks = asyncio.Task.all_tasks(loop=self.loop)
for task in tasks:
task.cancel()
with suppress(asyncio.CancelledError):
self.loop.run_until_complete(task)
self.loop.close()
logger.debug("Loop closed")
except Exception as e: # pragma: no cover
logger.error("Exception in the event loop: {}".format(e))

def finalize(self):
self.loop.call_soon_threadsafe(self.loop.stop)
logger.debug("Loop closed")
if self.running:
self.loop.call_soon_threadsafe(self.loop.stop)
self.running = False


def stop_container():
@@ -177,7 +177,7 @@ def timeago(date):
async def stop_now(self, request):
logger.warning("Stopping agent from web interface.")
await self.agent.stop()
return aioweb.json_response({})
return aioweb.json_response({}) # pragma: no cover

@aiohttp_jinja2.template("internal_tpl_messages.html")
async def get_messages(self, request):
@@ -1,8 +1,10 @@
import asyncio

import aioxmpp
import pytest
from asynctest import MagicMock, CoroutineMock

from spade.behaviour import OneShotBehaviour
from spade.behaviour import OneShotBehaviour, CyclicBehaviour
from spade.container import Container
from spade.message import Message
from tests.utils import make_connected_agent
@@ -113,3 +115,26 @@ def test_unregister():

assert not container.has_agent(str(agent.jid))
assert container.has_agent(str(agent2.jid))


def test_cancel_tasks():
agent = make_connected_agent()

class Behav(CyclicBehaviour):
async def run(self):
await asyncio.sleep(100)
self.has_finished = True

behav = Behav()
behav.has_finished = False
agent.add_behaviour(behaviour=behav)
future = agent.start()
future.result()

assert not behav.has_finished

container = Container()
container.stop()

assert not behav.has_finished

0 comments on commit 3107418

Please sign in to comment.
You can’t perform that action at this time.