Permalink
Browse files

remote: replace autoping with explicit polling

This fixes autobahn timeouts during long-running tests.

The mail problem is that the asyncio eventloop of the remote client is
not running while executing test cases. That causes the autoping
requests from the crossbar to time out, which triggers a disconnect.

We can't easily use autoping only for the exporters, so this implements
explicit polling from the coordinator to each exporter. It uses a new
'version' procedure for that, which is also useful for debugging.

The long-term fix is probably to move the autobahn client event loop
into a python thread, but that is a much larger change with higher risk.

Signed-off-by: Jan Luebbe <jlu@pengutronix.de>
  • Loading branch information...
jluebbe authored and Emantor committed Nov 16, 2017
1 parent 80c45c6 commit e13ba3d7662a216d1455f965c0c3d59611090243
Showing with 79 additions and 13 deletions.
  1. +0 −4 .crossbar/config.yaml
  2. +61 −4 labgrid/remote/coordinator.py
  3. +18 −5 labgrid/remote/exporter.py
View
@@ -51,10 +51,6 @@ workers:
ticket:
type: dynamic
authenticator: org.labgrid.authenticate
options:
auto_ping_interval: 1000
auto_ping_timeout: 2000
auto_ping_size: 4
components:
- type: class
classname: labgrid.remote.authenticator.AuthenticatorSession
@@ -1,6 +1,7 @@
"""The coordinator module coordinates exported resources and clients accessing them."""
# pylint: disable=no-member
import asyncio
import traceback
from collections import defaultdict
from os import environ
from pprint import pprint
@@ -27,6 +28,7 @@ class RemoteSession:
coordinator = attr.ib()
session = attr.ib()
authid = attr.ib()
version = attr.ib(default="unknown", init=False)
@property
def key(self):
@@ -96,6 +98,7 @@ class CoordinatorComponent(ApplicationSession):
def onConnect(self):
self.sessions = {}
self.places = {}
self.poll_task = None
yield from self.load()
@@ -117,7 +120,7 @@ def onJoin(self, details):
options=RegisterOptions(details_arg='details')
)
# resources
# resources
yield from self.register(
self.set_resource,
'org.labgrid.coordinator.set_resource',
@@ -163,8 +166,59 @@ def onJoin(self, details):
yield from self.register(
self.get_places, 'org.labgrid.coordinator.get_places'
)
self.poll_task = asyncio.get_event_loop().create_task(self.poll())
print("Coordinator ready.")
@asyncio.coroutine
def onLeave(self, details):
if self.poll_task:
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
super().onLeave(details)
@asyncio.coroutine
def onDisconnect(self):
if self.poll_task:
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
yield from asyncio.sleep(0.5) # give others a chance to clean up
@asyncio.coroutine
def _poll_step(self):
for session in list(self.sessions.values()):
if isinstance(session, ExporterSession):
fut = self.call(
'org.labgrid.exporter.{}.version'.format(session.name)
)
done, pending = yield from asyncio.wait([fut], timeout=5)
if not done:
print('kicking exporter ({}/{})'.format(session.key, session.name))
yield from self.on_session_leave(session.key)
continue
try:
session.version = done.pop().result()
except wamp.exception.ApplicationError as e:
if e.error == "wamp.error.no_such_procedure":
pass # old client
elif e.error == "wamp.error.canceled":
pass # disconnected
else:
raise
@asyncio.coroutine
def poll(self):
loop = asyncio.get_event_loop()
while not loop.is_closed():
try:
yield from asyncio.sleep(15.0)
yield from self._poll_step()
except asyncio.CancelledError:
break
except:
traceback.print_exc()
@asyncio.coroutine
def save(self):
with open('resources.yaml', 'w') as f:
@@ -241,7 +295,7 @@ def on_session_join(self, session_details):
@asyncio.coroutine
def on_session_leave(self, session_id):
pprint(session_id)
print('leave ({})'.format(session_id))
try:
session = self.sessions.pop(session_id)
except KeyError:
@@ -263,13 +317,16 @@ def attach(self, name, details=None):
@asyncio.coroutine
def set_resource(self, groupname, resourcename, resource, details=None):
session = self.sessions.get(details.caller)
if session is None:
return
assert isinstance(session, ExporterSession)
groupname = str(groupname)
resourcename = str(resourcename)
# TODO check if acquired
print(details)
pprint(resource)
session = self.sessions[details.caller]
assert isinstance(session, ExporterSession)
action, resource_path = session.set_resource(groupname, resourcename, resource)
if action is Action.ADD:
self._add_default_place(groupname)
View
@@ -16,6 +16,11 @@
from .config import ResourceConfig
from .common import ResourceEntry, enable_tcp_nodelay
try:
import pkg_resources
__version__ = pkg_resources.get_distribution('labgrid').version
except:
__version__ = "unknown"
def get_free_port():
"""Helper function to always return an unused port."""
@@ -191,6 +196,7 @@ def onConnect(self):
self.name = self.config.extra['name']
self.authid = "exporter/{}".format(self.name)
self.address = self._transport.transport.get_extra_info('sockname')[0]
self.poll_task = None
self.groups = {}
@@ -232,22 +238,25 @@ def onJoin(self, details):
prefix = 'org.labgrid.exporter.{}'.format(self.name)
yield from self.register(self.acquire, '{}.acquire'.format(prefix))
yield from self.register(self.release, '{}.release'.format(prefix))
yield from self.register(self.version, '{}.version'.format(prefix))
@asyncio.coroutine
def onLeave(self, details):
"""Cleanup after leaving the coordinator connection"""
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
if self.poll_task:
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
super().onLeave(details)
@asyncio.coroutine
def onDisconnect(self):
print("connection lost")
global reexec
reexec = True
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
yield from asyncio.sleep(0.5) # give others a chance to clean up
if self.poll_task:
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
yield from asyncio.sleep(0.5) # give others a chance to clean up
self.loop.stop()
@asyncio.coroutine
@@ -262,6 +271,10 @@ def release(self, group_name, resource_name):
#resource.release()
yield from self.update_resource(group_name, resource_name)
@asyncio.coroutine
def version(self):
return __version__
@asyncio.coroutine
def _poll_step(self):
for group_name, group in self.groups.items():

0 comments on commit e13ba3d

Please sign in to comment.