Skip to content

Commit

Permalink
Allow user configurable agent timeout for crossbar disconnection (#337)
Browse files Browse the repository at this point in the history
* Add crossbar-timeout to site args via add_arguments()

* Delay reconnection check attempts in same manner as twisted reconnection

* Define signal handlers on disconnect. This allows interrupting agents that
  are running with a crossbar-timeout of 0.

* Fix help text in site config options

* Add --crossbar-timeout to ocs-agent-cli

* yield when calling _stop_all_running_sessions()

As a result, yield also when calling _shutdown().

* Simplify 0 timeout logic

* Add crossbar-timeout at host level in the SCF

* Move signal handler definition to onConnect()

* Document crossbar-timeout

* Increase timeouts in tests

Now that we're properly waiting for Agent shutdown it takes a bit longer. These
times were sufficient when running tests locally.

* Add additional note about how to disable timeout
  • Loading branch information
BrianJKoopman committed Sep 26, 2023
1 parent d75c972 commit 6117ef8
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 36 deletions.
44 changes: 44 additions & 0 deletions docs/user/site_config.rst
Expand Up @@ -180,6 +180,50 @@ and Control Clients (see below).
If you're proceeding in the same terminal don't forget to source your
``.bashrc`` file.

Crossbar Connection Timeout
---------------------------
If an Agent loses connection to the crossbar server it will be unable to
publish any values to its Feeds. By default, the Agent stays online for 10
seconds, waiting to remake the connection to crossbar. If it fails to do so, it
will stop all running processes and shutdown.

There may be instances where you would like the Agent to continue running its
Processes, even if the connection to crossbar is lost for some amount of time
or indefinitely. For these cases there is the ``crossbar-timeout`` argument.
This can be set at the Host level, at the individual Agent level, passed on the
commandline, or set via an environment variable. Setting the timeout to 0
disabled the timeout, allowing the Agent to run indefinitely without a crossbar
connection.

.. note::
A crossbar connection is still required for initial startup of the Agent.

To set at the host level:

.. code-block:: yaml
hosts:
host-1: {
# Set timeout to 20 seconds for all Agents on this host
'crossbar-timeout': 20,
'agent-instances': [
# crossbar timeout set to 30 seconds
{'agent-class': 'Lakeshore240Agent',
'instance-id': 'thermo1',
'arguments': [['--serial-number', 'LSA11AA'],
['--mode', 'idle'],
['--crossbar-timeout', 30]]},
# crossbar timeout disabled
{'agent-class': 'Lakeshore240Agent',
'instance-id': 'thermo2',
'arguments': [['--serial-number', 'LSA22BB'],
['--mode', 'acq'],
['--crossbar-timeout', 0]]},
]
}
Commandline Arguments
=====================
There are several built in commandline arguments that can be passed to Agents
Expand Down
12 changes: 9 additions & 3 deletions ocs/agent_cli.py
Expand Up @@ -24,7 +24,8 @@
``ocs-agent-cli`` will also inspect environment variables for commonly passed
arguments to facilitate configuration within Docker containers. Supported
arguments include, ``--instance-id`` as ``INSTANCE_ID``, ``--site-hub`` as
``SITE_HUB``, and ``--site-http`` as ``SITE_HTTP``.
``SITE_HUB``, ``--site-http`` as ``SITE_HTTP``, and ``--crossbar-timeout`` as
``CROSSBAR_TIMEOUT``.
``ocs-agent-cli`` relies on the Agent being run belonging to an OCS Plugin. If
the Agent is not an OCS Plugin it can be run directly using both the
Expand All @@ -44,6 +45,10 @@ def _get_parser():
parser.add_argument('--instance-id', default=None, help="Agent unique instance-id. E.g. 'aggregator' or 'fakedata-1'.")
parser.add_argument('--site-hub', default=None, help="Site hub address.")
parser.add_argument('--site-http', default=None, help="Site HTTP address.")
# Default set in site_config.py within add_arguments()
parser.add_argument('--crossbar-timeout', type=int, help="Length of time in seconds "
"that the Agent will try to reconnect to the crossbar server before "
"shutting down. Disable the timeout by setting to 0.")

# Not passed through to Agent
parser.add_argument('--agent', default=None, help="Path to non-plugin OCS Agent.")
Expand Down Expand Up @@ -133,14 +138,15 @@ def main(args=None):
# Format is {"arg name within argparse": "ENVIRONMENT VARIABLE NAME"}
# E.g. --my-new-arg should be {"my_new_arg": "MY_NEW_ARG"}
optional_env = {"site_hub": "SITE_HUB",
"site_http": "SITE_HTTP"}
"site_http": "SITE_HTTP",
"crossbar_timeout": "CROSSBAR_TIMEOUT"}

for _name, _var in optional_env.items():
# Args passed on cli take priority
_arg = vars(pre_args)[_name]
_flag = f"--{_name}".replace('_', '-')
if _arg is not None:
post_args.extend([_flag, _arg])
post_args.extend([_flag, str(_arg)])
continue

# Get from ENV if not passed w/flag
Expand Down
58 changes: 46 additions & 12 deletions ocs/ocs_agent.py
Expand Up @@ -3,6 +3,7 @@
import txaio
txaio.use_twisted()

from twisted.application.internet import backoffPolicy
from twisted.internet import reactor, task, threads
from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList, FirstError, maybeDeferred
from twisted.internet.error import ReactorNotRunning
Expand All @@ -21,6 +22,7 @@
import math
import time
import datetime
import signal
import socket
import os
from ocs import client_t
Expand Down Expand Up @@ -135,6 +137,7 @@ def __init__(self, config, site_args, address=None, class_name=None):
@inlineCallbacks
def _stop_all_running_sessions(self):
"""Stops all currently running sessions."""
self.log.info('Stopping all running sessions')
for session in self.sessions:
if self.sessions[session] is not None:
self.log.info("Stopping session {sess}", sess=session)
Expand All @@ -154,6 +157,15 @@ def _stop_all_running_sessions(self):
"""

def onConnect(self):
# Define signal handlers
@inlineCallbacks
def signal_handler(sig, frame):
self.log.info('caught {signal}!', signal=signal.Signals(sig).name)
yield self._shutdown()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

self.log.info('transport connected')
self.join(self.config.realm)

Expand Down Expand Up @@ -222,45 +234,67 @@ def _subscribe_fail(*args, **kwargs):

self.realm_joined = True

@inlineCallbacks
def onLeave(self, details):
self.log.info('session left: {}'.format(details))
if self.heartbeat_call is not None:
self.heartbeat_call.stop()

# Normal shutdown
if details.reason == "wamp.close.normal":
self._stop_all_running_sessions()
yield self._stop_all_running_sessions()

self.disconnect()

# Unsub from all topics, since we've left the realm
self.subscribed_topics = set()
self.realm_joined = False

@inlineCallbacks
def _shutdown(self):
# Stop all sessions and then stop the reactor
yield self._stop_all_running_sessions()
try:
self.log.info('stopping reactor')
reactor.stop()
except ReactorNotRunning:
pass

@inlineCallbacks
def onDisconnect(self):
self.log.info('transport disconnected')
self.log.info('waiting for reconnection')

# Wait to see if we reconnect before stopping the reactor
timeout = 10
timeout = self.site_args.crossbar_timeout

# Wait forever
if timeout == 0:
return

# compute_delay(attempts): Delay in seconds given number of attempts
# Twisted has an exponential backoff interval that prevents flooding
# reconnect attempts. We follow that roughly for checking the Agent has
# reconnected to crossbar up to a 30 second delay.
compute_delay = backoffPolicy(maxDelay=30.0)

# Disconnect after timeout
disconnected_at = time.time()
attempt = 0
while time.time() - disconnected_at < timeout:
attempt += 1 # twisted also starts at 1 attempt

# successful reconnection
if self.realm_joined:
self.log.info('realm rejoined')
return

time_left = timeout - (time.time() - disconnected_at)
self.log.info('waiting for reconnect for {} more seconds'.format(time_left))
yield dsleep(1)
self.log.info('waiting at least {} more seconds before giving up'.format(time_left))
delay = compute_delay(attempt)
yield dsleep(delay)

# shutdown after timeout expires
self._stop_all_running_sessions()
try:
self.log.info('stopping reactor')
reactor.stop()
except ReactorNotRunning:
pass
# Shutdown after timeout expires
yield self._shutdown()

"""The methods below provide OCS framework support."""

Expand Down
48 changes: 30 additions & 18 deletions ocs/site_config.py
Expand Up @@ -72,6 +72,7 @@ def __init__(self, name=None):
self.agent_paths = []
self.log_dir = None
self.working_dir = os.getcwd()
self.crossbar_timeout = None

@classmethod
def from_dict(cls, data, parent=None, name=None):
Expand Down Expand Up @@ -111,6 +112,7 @@ def from_dict(cls, data, parent=None, name=None):
self.agent_paths = data.get('agent-paths', [])
self.crossbar = CrossbarConfig.from_dict(data.get('crossbar'))
self.log_dir = data.get('log-dir', None)
self.crossbar_timeout = data.get('crossbar_timeout', 10)
return self


Expand Down Expand Up @@ -420,28 +422,36 @@ def add_arguments(parser=None):
``--working-dir=...``:
Propagate the working directory.
``--crossbar-timeout=...``:
Length of time in seconds that the Agent will try to reconnect to the
crossbar server before shutting down.
"""
if parser is None:
parser = argparse.ArgumentParser()
group = parser.add_argument_group('Site Config Options')
group.add_argument('--site', help="""Instead of the default site, use the configuration for the
specified site. The configuration is loaded from
``$OCS_CONFIG_DIR/{site}.yaml``. If ``--site=none``, the
site_config facility will not be used at all.""")
group.add_argument('--site-file', help="""Instead of the default site config, use the specified file. Full
path must be specified.""")
group.add_argument('--site-host', help="""Override the OCS determination of what host this instance is
running on, and instead use the configuration for the indicated
host.""")
group.add_argument('--site-hub', help="""Override the ocs hub url (wamp_server).""")
group.add_argument('--site-http', help="""Override the ocs hub http url (wamp_http).""")
group.add_argument('--site-realm', help="""Override the ocs hub realm (wamp_realm).""")
group.add_argument('--instance-id', help="""Look in the SCF for Agent-instance specific configuration options,
and use those to launch the Agent.""")
group.add_argument('--address-root', help="""Override the site default address root.""")
group.add_argument('--registry-address', help="""Deprecated.""")
group.add_argument('--log-dir', help="""Set the logging directory.""")
group.add_argument('--working-dir', help="""Propagate the working directory.""")
group.add_argument('--site', help="Instead of the default site, use the "
"configuration for the specified site. The configuration is loaded "
"from ``$OCS_CONFIG_DIR/{site}.yaml``. If ``--site=none``, the "
"site_config facility will not be used at all.")
group.add_argument('--site-file', help="Instead of the default site config, "
"use the specified file. Full path must be specified.")
group.add_argument('--site-host', help="Override the OCS determination of "
"what host this instance is running on, and instead use the "
"configuration for the indicated host.")
group.add_argument('--site-hub', help="Override the ocs hub url (wamp_server).")
group.add_argument('--site-http', help="Override the ocs hub http url (wamp_http).")
group.add_argument('--site-realm', help="Override the ocs hub realm (wamp_realm).")
group.add_argument('--instance-id', help="Look in the SCF for "
"Agent-instance specific configuration options, and use those to launch "
"the Agent.")
group.add_argument('--address-root', help="Override the site default address root.")
group.add_argument('--registry-address', help="Deprecated.")
group.add_argument('--log-dir', help="Set the logging directory.")
group.add_argument('--working-dir', help="Propagate the working directory.")
group.add_argument('--crossbar-timeout', type=int, help="Length of time in seconds "
"that the Agent will try to reconnect to the crossbar server before "
"shutting down. Note this is set per Agent in an instance's arguments list.")
return parser


Expand Down Expand Up @@ -568,6 +578,8 @@ def add_site_attributes(args, site, host=None):
args.address_root = site.hub.data['address_root']
if (args.log_dir is None) and (host is not None):
args.log_dir = host.log_dir
if (args.crossbar_timeout is None) and (host is not None):
args.crossbar_timeout = host.crossbar_timeout


@deprecation.deprecated(deprecated_in='v0.6.0',
Expand Down
2 changes: 1 addition & 1 deletion ocs/testing.py
Expand Up @@ -11,7 +11,7 @@
from ocs.ocs_client import OCSClient


SIGINT_TIMEOUT = 5
SIGINT_TIMEOUT = 10


def create_agent_runner_fixture(agent_path, agent_name, args=None):
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_crossbar_integration.py
Expand Up @@ -164,8 +164,8 @@ def test_proper_agent_shutdown_on_lost_transport(wait_for_crossbar):
crossbar_container = client.containers.get('ocs-tests-crossbar')
crossbar_container.stop()

# 15 seconds should be enough with default 10 second timeout
timeout = 15
# 25 seconds should be enough with default 10 second timeout
timeout = 25
while timeout > 0:
time.sleep(1) # give time for the fake-data-agent to timeout, then shutdown
fake_data_container = client.containers.get('ocs-tests-fake-data-agent')
Expand Down

0 comments on commit 6117ef8

Please sign in to comment.