diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 0cfa8873..36f6aeef 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -118,8 +118,9 @@ for the Dell PowerConnect device:: ngs_switchport_mode = access Dell PowerConnect devices have been seen to have issues with multiple -concurrent configuration sessions. See :ref:`synchronization` for details on -how to limit the number of concurrent active connections to each device. +concurrent configuration sessions. See :ref:`synchronization` and +:ref:`batching` for details on how to limit the number of concurrent active +connections to each device. for the Brocade FastIron (ICX) device:: @@ -201,8 +202,16 @@ connection URL for the backend should be configured as follows:: [ngs_coordination] backend_url = -The default is to limit the number of concurrent active connections to each -device to one, but the number may be configured per-device as follows:: +The backend URL format includes the Tooz driver as the scheme, with driver +options passed using query string parameters. For example, to use the +``etcd3gw`` driver with an API version of ``v3`` and a path to a CA +certificate:: + + [ngs_coordination] + backend_url = etcd3+https://etcd.example.com?api_version=v3,ca_cert=/path/to/ca/cert.crt + +The default behaviour is to limit the number of concurrent active connections +to each device to one, but the number may be configured per-device as follows:: [genericswitch:device-hostname] ngs_max_connections = @@ -216,6 +225,35 @@ timeout of 60 seconds before failing. This timeout can be configured as follows ... acquire_timeout = +.. _batching: + +Batching +======== + +For many network devices there is a significant SSH connection overhead which +is incurred for each network or port configuration change. In a large scale +system with many concurrent changes, this overhead adds up quickly. Since the +Antelope release, the Generic Switch driver includes support to batch up switch +configuration changes and apply them together using a single SSH connection. + +This is implemented using etcd as a queueing system. Commands are added +to an input key, then a worker thread processes the available commands +for a particular switch device. We pull off the queue using the version +at which the keys were added, giving a FIFO style queue. The result of +each command set are added to an output key, which the original request +thread is watching. Distributed locks are used to serialise the +processing of commands for each switch device. + +The etcd endpoint is configured using the same ``[ngs_coordination] +backend_url`` option used in :ref:`synchronization`, with the limitation that +only ``etcd3gw`` is supported. + +Additionally, each device that will use batched configuration should include +the following option:: + + [genericswitch:device-hostname] + ngs_batch_requests = True + Disabling Inactive Ports ======================== diff --git a/networking_generic_switch/batching.py b/networking_generic_switch/batching.py new file mode 100644 index 00000000..df765a14 --- /dev/null +++ b/networking_generic_switch/batching.py @@ -0,0 +1,451 @@ +# Copyright 2023 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import atexit +import json + +import etcd3gw +from etcd3gw import exceptions as etcd3gw_exc +from etcd3gw.utils import _decode +from etcd3gw.utils import _encode +from etcd3gw.utils import _increment_last_byte +import eventlet +from oslo_log import log as logging +from oslo_utils import netutils +from oslo_utils import uuidutils +import tenacity + +from networking_generic_switch import exceptions as exc + +SHUTDOWN_TIMEOUT = 60 + +LOG = logging.getLogger(__name__) + +THREAD_POOL = eventlet.greenpool.GreenPool() + + +class ShutdownTimeout(Exception): + """Exception raised when shutdown timeout is exceeded.""" + + +@atexit.register +def _wait_for_threads(): + """Wait for all threads in the pool to complete. + + This function is registered to execute at exit, to ensure that all worker + threads have completed. These threads may be holding switch execution locks + and performing switch configuration operations which should not be + interrupted. + """ + LOG.info("Waiting %d seconds for %d threads to complete", + SHUTDOWN_TIMEOUT, THREAD_POOL.running()) + try: + with eventlet.Timeout(SHUTDOWN_TIMEOUT, ShutdownTimeout): + THREAD_POOL.waitall() + except ShutdownTimeout: + LOG.error("Timed out waiting for threads to complete") + else: + LOG.info("Finished waiting for threads to complete") + + +class SwitchQueueItem(object): + """An item in the queue.""" + + def __init__(self, uuid, create_revision): + self.uuid = uuid + self.create_revision = create_revision + + +class SwitchQueue(object): + INPUT_PREFIX = "/ngs/batch/%s/input/" + INPUT_ITEM_KEY = "/ngs/batch/%s/input/%s" + RESULT_ITEM_KEY = "/ngs/batch/%s/output/%s" + EXEC_LOCK = "/ngs/batch/%s/execute_lock" + + def __init__(self, switch_name, etcd_client): + self.switch_name = switch_name + self.client = etcd_client + self.lease_ttl = 600 + + def add_batch(self, cmds): + """Clients add batch, given key events. + + Each batch is given an uuid that is used to generate both + and input and result key in etcd. + + First we watch for any results, second we write the input + in a location that the caller of get_batches will be looking. + + No locks are required when calling this function to send work + to the workers, and start waiting for results. + + :param cmds: an iterable of commands + :return: a SwitchQueueItem object + """ + + uuid = uuidutils.generate_uuid() + result_key = self.RESULT_ITEM_KEY % (self.switch_name, uuid) + input_key = self.INPUT_ITEM_KEY % (self.switch_name, uuid) + + batch = { + "uuid": uuid, + "input_key": input_key, + "result_key": result_key, + "cmds": cmds, + } + value = json.dumps(batch, sort_keys=True).encode("utf-8") + lease = self.client.lease(ttl=self.lease_ttl) + # Use a transaction rather than create() in order to extract the + # create revision. + base64_key = _encode(input_key) + base64_value = _encode(value) + txn = { + 'compare': [{ + 'key': base64_key, + 'result': 'EQUAL', + 'target': 'CREATE', + 'create_revision': 0 + }], + 'success': [{ + 'request_put': { + 'key': base64_key, + 'value': base64_value, + } + }], + 'failure': [] + } + txn['success'][0]['request_put']['lease'] = lease.id + result = self.client.transaction(txn) + + success = result.get('succeeded', False) + # Be sure to free watcher resources on error + if not success: + raise exc.GenericSwitchBatchError( + device=self.switch_name, + error="Failed to add batch to input key: %s" % input_key) + + put_response = result['responses'][0]['response_put'] + create_revision = put_response['header']['revision'] + LOG.debug("written input key %s revision %s", + input_key, create_revision) + + return SwitchQueueItem(uuid, create_revision) + + def wait_for_result(self, item, timeout): + """Wait for the result of a command batch. + + :param item: SwitchQueueItem object returned by add_batch + :param timeout: wait timeout in seconds + :return: output string generated by this command set + :raises: Exception if waiting times out or the command batch was + unsuccessful + """ + result_key = self.RESULT_ITEM_KEY % (self.switch_name, item.uuid) + try: + event = self.client.watch_once(result_key, timeout=timeout, + start_revision=item.create_revision) + except etcd3gw_exc.WatchTimedOut: + raise exc.GenericSwitchBatchError( + device=self.switch_name, + error="Timed out waiting for result key: %s" % result_key) + + LOG.debug("got event: %s", event) + if event["kv"]["version"] == 0: + raise exc.GenericSwitchBatchError( + device=self.switch_name, + error="Output key was deleted, perhaps lease expired") + # TODO(johngarbutt) check we have the create event and result? + result_dict = self._get_and_delete_result(result_key) + LOG.debug("got result: %s", result_dict) + if "result" in result_dict: + return result_dict["result"] + else: + raise exc.GenericSwitchBatchError( + device=self.switch_name, + error=result_dict["error"]) + + def _get_and_delete_result(self, result_key): + # called when watch event says the result key should exist + txn = { + 'compare': [], + 'success': [{ + 'request_delete_range': { + 'key': _encode(result_key), + 'prev_kv': True, + } + }], + 'failure': [] + } + result = self.client.transaction(txn) + success = result.get('succeeded', False) + if not success: + raise exc.GenericSwitchBatchError( + device=self.switch_name, + error="Unable to find result: %s" % result_key) + delete_response = result['responses'][0]['response_delete_range'] + raw_value = delete_response['prev_kvs'][0]['value'] + result_dict = json.loads(_decode(raw_value)) + LOG.debug("fetched and deleted result for: %s", result_key) + return result_dict + + def _get_raw_batches(self, max_create_revision=None): + input_prefix = self.INPUT_PREFIX % self.switch_name + # Sort order ensures FIFO style queue + # Use get rather than get_prefix since get accepts max_create_revision. + range_end = _encode(_increment_last_byte(input_prefix)) + raw_batches = self.client.get(input_prefix, + metadata=True, + range_end=range_end, + sort_order="ascend", + sort_target="create", + max_create_revision=max_create_revision) + return raw_batches + + def get_batches(self, item=None): + """Return a list of the event dicts written in wait for result. + + This is called both with or without getting a lock to get the + latest list of work that has send to the per switch queue in + etcd. + + :param item: Optional SwitchQueueItem object. If provided, only batches + added up to and including this item are returned. + """ + max_create_revision = item.create_revision if item else None + raw_batches = self._get_raw_batches(max_create_revision) + LOG.debug("found %s batches", len(raw_batches)) + + batches = [] + for raw_value, metadata in raw_batches: + batch = json.loads(raw_value.decode('utf-8')) + batches.append(batch) + return batches + + def record_result(self, batch): + """Record the result from executing given command set. + + We assume that a lock is held before getting a fresh list + of batches, executing them, and then calling this record + results function, before finally dropping the lock. + """ + # Write results and delete input keys so the next worker to hold the + # lock knows not to execute these batches + lease = self.client.lease(ttl=self.lease_ttl) + result_value = json.dumps(batch, sort_keys=True).encode('utf-8') + txn = { + 'compare': [], + 'success': [ + { + 'request_put': { + 'key': _encode(batch['result_key']), + 'value': _encode(result_value), + 'lease': lease.id, + } + }, + { + 'request_delete_range': { + 'key': _encode(batch['input_key']), + } + } + ], + 'failure': [] + } + result = self.client.transaction(txn) + success = result.get('succeeded', False) + if not success: + LOG.error("failed to report batch result for: %s", + batch) + else: + LOG.debug("written result key: %s", batch['result_key']) + + def acquire_worker_lock(self, item, acquire_timeout=300, lock_ttl=120, + wait=None): + """Wait for lock needed to call record_result. + + This blocks until the work queue is empty of the switch lock is + acquired. If we timeout waiting for the lock we raise an exception. + """ + lock_name = self.EXEC_LOCK % self.switch_name + lock = self.client.lock(lock_name, lock_ttl) + + if wait is None: + wait = tenacity.wait_random(min=1, max=3) + + @tenacity.retry( + # Log a message after each failed attempt. + after=tenacity.after_log(LOG, logging.DEBUG), + # Retry if we haven't got the lock yet + retry=tenacity.retry_if_result(lambda x: x is False), + # Stop after timeout. + stop=tenacity.stop_after_delay(acquire_timeout), + # Wait between lock retries + wait=wait, + ) + def _acquire_lock_with_retry(): + lock_acquired = lock.acquire() + if lock_acquired: + return lock + + # Stop waiting for the lock if there is nothing to do + work = self._get_raw_batches(item.create_revision) + if not work: + return None + + # Trigger a retry + return False + + return _acquire_lock_with_retry() + + +class SwitchBatch(object): + def __init__(self, switch_name, etcd_url=None, switch_queue=None): + if switch_queue is None: + parsed_url = netutils.urlsplit(etcd_url) + host = parsed_url.hostname + port = parsed_url.port + protocol = 'https' if parsed_url.scheme.endswith( + 'https') else 'http' + # Use the same parameter format as tooz etcd3gw driver. + params = parsed_url.params() + ca_cert = params.get('ca_cert') + cert_key = params.get('cert_key') + cert_cert = params.get('cert_cert') + api_version = params.get('api_version', 'v3alpha') + etcd_client = etcd3gw.Etcd3Client( + host=host, port=port, protocol=protocol, + ca_cert=ca_cert, cert_key=cert_key, cert_cert=cert_cert, + api_path='/' + api_version + '/', + timeout=30) + self.queue = SwitchQueue(switch_name, etcd_client) + else: + self.queue = switch_queue + self.switch_name = switch_name + + def do_batch(self, device, cmd_set, timeout=300): + """Batch up switch configuration commands to reduce overheads. + + We collect together the iterables in the cmd_set, and + execute them toegether in a single larger batch. + + :param device: a NetmikoSwitch device object + :param cmd_set: an iterable of commands + :return: output string generated by this command set + """ + + # request that the cmd_set by executed + cmd_list = list(cmd_set) + item = self.queue.add_batch(cmd_list) + + def do_work(): + try: + self._execute_pending_batches(device, item) + except Exception as e: + LOG.error("failed to run execute batch: %s", e, + exec_info=True) + raise + + self._spawn(do_work) + + # Wait for our result key + # as the result might be done before the above task starts + output = self.queue.wait_for_result(item, timeout) + LOG.debug("Got batch result: %s", output) + return output + + @staticmethod + def _spawn(work_fn): + # TODO(johngarbutt) remove hard eventlet dependency + # in a similar way to etcd3gw + # Sleep to let possible other work to batch together + eventlet.sleep(0) + # Run all pending tasks, which might be a no op + # if pending tasks already ran + THREAD_POOL.spawn_n(work_fn) + + def _execute_pending_batches(self, device, item): + """Execute all batches currently registered. + + Typically called by every caller of add_batch. + Could be a noop if all batches are already executed. + + :param device: a NetmikoSwitch device object + :param item: a SwitchQueueItem object + """ + batches = self.queue.get_batches(item) + if not batches: + LOG.debug("Skipped execution for %s", self.switch_name) + return + LOG.debug("Found %d batches - trying to acquire lock for %s", + len(batches), self.switch_name) + + # Many workers can end up piling up here trying to acquire the + # lock. Only consider batches at least as old as the one that triggered + # this worker, to ensure they don't wait forever. + lock = self.queue.acquire_worker_lock(item) + if lock is None: + # This means we stopped waiting as the work queue was empty + LOG.debug("Work list empty for %s", self.switch_name) + return + + # Check we got the lock + if not lock.is_acquired(): + raise exc.GenericSwitchBatchError( + device=self.switch_name, + error="unable to get lock for: %s" % self.switch_name) + + # be sure to drop the lock when we are done + try: + LOG.debug("got lock for %s", self.switch_name) + + # Fetch fresh list now we have the lock + # and order the list so we execute in order added + batches = self.queue.get_batches() + if not batches: + LOG.debug("No batches to execute %s", self.switch_name) + return + + LOG.debug("Starting to execute %d batches", len(batches)) + self._send_commands(device, batches, lock) + finally: + lock.release() + + LOG.debug("end of lock for %s", self.switch_name) + + def _send_commands(self, device, batches, lock): + with device._get_connection() as net_connect: + for batch in batches: + try: + output = device.send_config_set(net_connect, batch['cmds']) + batch["result"] = output + except Exception as e: + batch["error"] = str(e) + + # The switch configuration can take a long time, and may exceed + # the lock TTL. Periodically refresh our lease, and verify that + # we still own the lock before recording the results. + lock.refresh() + if not lock.is_acquired(): + raise exc.GenericSwitchBatchError( + device=self.switch_name, + error="Worker aborting - lock timed out") + + # Tell request watchers the result and + # tell workers which batches have now been executed + self.queue.record_result(batch) + + try: + device.save_configuration(net_connect) + except Exception: + LOG.exception("Failed to save configuration") + # Probably not worth failing all batches for this. diff --git a/networking_generic_switch/devices/__init__.py b/networking_generic_switch/devices/__init__.py index c08803cd..acd59789 100644 --- a/networking_generic_switch/devices/__init__.py +++ b/networking_generic_switch/devices/__init__.py @@ -42,7 +42,11 @@ {'name': 'ngs_network_name_format', 'default': '{network_id}'}, # If false, ngs will not add and delete VLANs from switches {'name': 'ngs_manage_vlans', 'default': True}, - {'name': 'vlan_translation_supported', 'default': False} + {'name': 'vlan_translation_supported', 'default': False}, + # If False, ngs will skip saving configuration on devices + {'name': 'ngs_save_configuration', 'default': True}, + # When true try to batch up in flight switch requests + {'name': 'ngs_batch_requests', 'default': False}, ] @@ -137,6 +141,11 @@ def _do_vlan_management(self): """Check if drivers should add and remove VLANs from switches.""" return strutils.bool_from_string(self.ngs_config['ngs_manage_vlans']) + def _batch_requests(self): + """Return whether to batch up requests to the switch.""" + return strutils.bool_from_string( + self.ngs_config['ngs_batch_requests']) + @abc.abstractmethod def add_network(self, segmentation_id, network_id): pass diff --git a/networking_generic_switch/devices/netmiko_devices/__init__.py b/networking_generic_switch/devices/netmiko_devices/__init__.py index 9bf70606..c4d16f60 100644 --- a/networking_generic_switch/devices/netmiko_devices/__init__.py +++ b/networking_generic_switch/devices/netmiko_devices/__init__.py @@ -24,6 +24,7 @@ import tenacity from tooz import coordination +from networking_generic_switch import batching from networking_generic_switch import devices from networking_generic_switch.devices import utils as device_utils from networking_generic_switch import exceptions as exc @@ -117,20 +118,32 @@ def __init__(self, device_cfg): self.config['session_log_record_writes'] = True self.config['session_log_file_mode'] = 'append' + self.lock_kwargs = { + 'locks_pool_size': int(self.ngs_config['ngs_max_connections']), + 'locks_prefix': self.config.get( + 'host', '') or self.config.get('ip', ''), + 'timeout': CONF.ngs_coordination.acquire_timeout} + self.locker = None - if CONF.ngs_coordination.backend_url: + self.batch_cmds = None + if self._batch_requests(): + if not CONF.ngs_coordination.backend_url: + raise exc.GenericSwitchNetmikoConfigError( + config=device_utils.sanitise_config(self.config), + error="ngs_batch_requests is true but [ngs_coordination] " + "backend_url is not provided") + # NOTE: we skip the lock if we are batching requests + self.locker = None + switch_name = self.lock_kwargs['locks_prefix'] + self.batch_cmds = batching.SwitchBatch( + switch_name, CONF.ngs_coordination.backend_url) + elif CONF.ngs_coordination.backend_url: self.locker = coordination.get_coordinator( CONF.ngs_coordination.backend_url, ('ngs-' + device_utils.get_hostname()).encode('ascii')) self.locker.start() atexit.register(self.locker.stop) - self.lock_kwargs = { - 'locks_pool_size': int(self.ngs_config['ngs_max_connections']), - 'locks_prefix': self.config.get( - 'host', '') or self.config.get('ip', ''), - 'timeout': CONF.ngs_coordination.acquire_timeout} - def _format_commands(self, commands, **kwargs): if not commands: return [] @@ -192,6 +205,12 @@ def send_commands_to_device(self, cmd_set): LOG.debug("Nothing to execute") return + # If configured, batch up requests to the switch + if self.batch_cmds is not None: + return self.batch_cmds.do_batch(self, cmd_set) + return self._send_commands_to_device(cmd_set) + + def _send_commands_to_device(self, cmd_set): try: with ngs_lock.PoolLock(self.locker, **self.lock_kwargs): with self._get_connection() as net_connect: diff --git a/networking_generic_switch/exceptions.py b/networking_generic_switch/exceptions.py index edadfb5d..27f4de8e 100644 --- a/networking_generic_switch/exceptions.py +++ b/networking_generic_switch/exceptions.py @@ -54,3 +54,7 @@ class GenericSwitchNetmikoConfigError(GenericSwitchException): class GenericSwitchNotSupported(GenericSwitchException): message = _("Requested feature is not supported by " "networking-generic-switch. %(error)s") + + +class GenericSwitchBatchError(GenericSwitchException): + message = _("Batching error: %(device)s, error: %(error)s") diff --git a/networking_generic_switch/tests/unit/netmiko/test_netmiko_base.py b/networking_generic_switch/tests/unit/netmiko/test_netmiko_base.py index 078441b2..6662e4f1 100644 --- a/networking_generic_switch/tests/unit/netmiko/test_netmiko_base.py +++ b/networking_generic_switch/tests/unit/netmiko/test_netmiko_base.py @@ -48,6 +48,15 @@ def _make_switch_device(self, extra_cfg={}): class TestNetmikoSwitch(NetmikoSwitchTestBase): + def test_batch(self): + self.cfg.config(backend_url='url', group='ngs_coordination') + self._make_switch_device({'ngs_batch_requests': True}) + + def test_batch_missing_backend_url(self): + self.assertRaisesRegex( + Exception, "backend_url", + self._make_switch_device, {'ngs_batch_requests': True}) + @mock.patch('networking_generic_switch.devices.netmiko_devices.' 'NetmikoSwitch.send_commands_to_device', return_value='fake output') diff --git a/networking_generic_switch/tests/unit/test_batching.py b/networking_generic_switch/tests/unit/test_batching.py new file mode 100644 index 00000000..868186a8 --- /dev/null +++ b/networking_generic_switch/tests/unit/test_batching.py @@ -0,0 +1,442 @@ +# Copyright 2023 StackHPC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from etcd3gw.exceptions import Etcd3Exception +from etcd3gw.utils import _encode +from etcd3gw.utils import _increment_last_byte +import fixtures +from oslo_config import fixture as config_fixture +from oslo_utils import uuidutils +import tenacity + +from networking_generic_switch import batching +from networking_generic_switch import exceptions as exc + + +class SwitchQueueTest(fixtures.TestWithFixtures): + def setUp(self): + super(SwitchQueueTest, self).setUp() + self.cfg = self.useFixture(config_fixture.Config()) + + self.client = mock.Mock() + self.switch_name = "switch1" + self.queue = batching.SwitchQueue(self.switch_name, self.client) + + @mock.patch.object(uuidutils, "generate_uuid") + def test_add_batch(self, mock_uuid): + mock_uuid.return_value = "uuid" + self.client.transaction.return_value = { + "succeeded": True, + "responses": [{ + "response_put": { + "header": { + "revision": 42 + } + } + }] + } + + item = self.queue.add_batch(["cmd1", "cmd2"]) + + self.assertEqual("uuid", item.uuid) + self.assertEqual(42, item.create_revision) + input_key = '/ngs/batch/switch1/input/uuid' + expected_value = ( + b'{"cmds": ["cmd1", "cmd2"], ' + b'"input_key": "/ngs/batch/switch1/input/uuid", ' + b'"result_key": "/ngs/batch/switch1/output/uuid", ' + b'"uuid": "uuid"}') + expected_txn = { + 'compare': [{ + 'key': _encode(input_key), + 'result': 'EQUAL', + 'target': 'CREATE', + 'create_revision': 0 + }], + 'success': [{ + 'request_put': { + 'key': _encode(input_key), + 'value': _encode(expected_value), + 'lease': mock.ANY, + } + }], + 'failure': [] + } + self.client.transaction.assert_called_once_with(expected_txn) + + @mock.patch.object(uuidutils, "generate_uuid") + def test_add_batch_failure(self, mock_uuid): + mock_uuid.return_value = "uuid" + self.client.transaction.side_effect = Etcd3Exception + + self.assertRaises(Etcd3Exception, + self.queue.add_batch, ["cmd1", "cmd2"]) + + @mock.patch.object(uuidutils, "generate_uuid") + def test_add_batch_failure2(self, mock_uuid): + mock_uuid.return_value = "uuid" + self.client.transaction.return_value = {"succeeded": False} + + self.assertRaises(exc.GenericSwitchBatchError, + self.queue.add_batch, ["cmd1", "cmd2"]) + + @mock.patch.object(batching.SwitchQueue, "_get_and_delete_result") + def test_wait_for_result(self, mock_get): + event = { + "kv": { + "version": 1 + } + } + self.client.watch_once.return_value = event + mock_get.return_value = {"result": "result1"} + item = batching.SwitchQueueItem("uuid", 42) + + result = self.queue.wait_for_result(item, 43) + + self.assertEqual("result1", result) + result_key = '/ngs/batch/switch1/output/uuid' + self.client.watch_once.assert_called_once_with( + result_key, timeout=43, start_revision=42) + mock_get.assert_called_once_with(result_key) + + def test_get_and_delete_result(self): + self.client.transaction.return_value = { + "succeeded": True, + "responses": [{ + "response_delete_range": { + "prev_kvs": [{ + "value": _encode(b'{"foo": "bar"}') + }] + } + }] + } + + result = self.queue._get_and_delete_result(b"result_key") + + self.assertEqual({"foo": "bar"}, result) + + expected_txn = { + 'compare': [], + 'success': [{ + 'request_delete_range': { + 'key': _encode(b"result_key"), + 'prev_kv': True, + } + }], + 'failure': [] + } + self.client.transaction.assert_called_once_with(expected_txn) + + def test_get_and_delete_result_failure(self): + self.client.transaction.return_value = {"succeeded": False} + + self.assertRaises(exc.GenericSwitchBatchError, + self.queue._get_and_delete_result, b"result_key") + + def test_get_batches(self): + self.client.get.return_value = [ + (b'{"foo": "bar"}', {}), + (b'{"foo1": "bar1"}', {}) + ] + + batches = self.queue.get_batches() + + self.assertEqual([ + {"foo": "bar"}, + {"foo1": "bar1"} + ], batches) + input_prefix = '/ngs/batch/switch1/input/' + self.client.get.assert_called_once_with( + input_prefix, + metadata=True, + range_end=_encode(_increment_last_byte(input_prefix)), + sort_order='ascend', sort_target='create', + max_create_revision=None) + + def test_get_batches_with_item(self): + self.client.get.return_value = [ + (b'{"foo": "bar"}', {}), + (b'{"foo1": "bar1"}', {}) + ] + item = batching.SwitchQueueItem("uuid", 42) + + batches = self.queue.get_batches(item) + + self.assertEqual([ + {"foo": "bar"}, + {"foo1": "bar1"} + ], batches) + input_prefix = '/ngs/batch/switch1/input/' + self.client.get.assert_called_once_with( + input_prefix, + metadata=True, + range_end=_encode(_increment_last_byte(input_prefix)), + sort_order='ascend', sort_target='create', + max_create_revision=42) + + def test_record_result(self): + self.client.transaction.return_value = {"succeeded": True} + batch = {"result_key": "result1", "input_key": "input1", + "result": "asdf"} + + self.queue.record_result(batch) + + self.client.lease.assert_called_once_with(ttl=600) + expected_value = ( + b'{"input_key": "input1", ' + b'"result": "asdf", "result_key": "result1"}') + expected_txn = { + 'compare': [], + 'success': [ + { + 'request_put': { + 'key': _encode("result1"), + 'value': _encode(expected_value), + 'lease': mock.ANY, + } + }, + { + 'request_delete_range': { + 'key': _encode("input1"), + } + } + ], + 'failure': [] + } + self.client.transaction.assert_called_once_with(expected_txn) + + def test_record_result_failure(self): + self.client.transaction.return_value = {"succeeded": False} + batch = {"result_key": "result1", "input_key": "input1", + "result": "asdf"} + + # Should not raise an exception. + self.queue.record_result(batch) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_timeout(self, mock_get): + mock_get.return_value = ["work"] + lock = mock.MagicMock() + lock.acquire.return_value = False + self.client.lock.return_value = lock + item = batching.SwitchQueueItem("uuid", 42) + + wait = tenacity.wait_none() + self.assertRaises( + tenacity.RetryError, + self.queue.acquire_worker_lock, + item, wait=wait, acquire_timeout=0.05) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_no_work(self, mock_get): + mock_get.side_effect = [["work"], None] + lock = mock.MagicMock() + lock.acquire.return_value = False + self.client.lock.return_value = lock + item = batching.SwitchQueueItem("uuid", 42) + + wait = tenacity.wait_none() + result = self.queue.acquire_worker_lock( + item, wait=wait, acquire_timeout=0.05) + + self.assertIsNone(result) + self.assertEqual(2, mock_get.call_count) + self.assertEqual(2, lock.acquire.call_count) + + @mock.patch.object(batching.SwitchQueue, "_get_raw_batches") + def test_acquire_worker_lock_success(self, mock_get): + mock_get.return_value = ["work"] + lock = mock.MagicMock() + lock.acquire.side_effect = [False, False, True] + self.client.lock.return_value = lock + item = batching.SwitchQueueItem("uuid", 42) + + wait = tenacity.wait_none() + result = self.queue.acquire_worker_lock( + item, wait=wait, acquire_timeout=0.05) + + self.assertEqual(lock, result) + self.assertEqual(2, mock_get.call_count) + self.assertEqual(3, lock.acquire.call_count) + + +class SwitchBatchTest(fixtures.TestWithFixtures): + def setUp(self): + super(SwitchBatchTest, self).setUp() + self.cfg = self.useFixture(config_fixture.Config()) + + self.queue = mock.Mock() + self.switch_name = "switch1" + self.batch = batching.SwitchBatch( + self.switch_name, switch_queue=self.queue) + + @mock.patch.object(batching.SwitchBatch, "_spawn") + def test_do_batch(self, mock_spawn): + self.queue.add_batch.return_value = "item" + self.queue.wait_for_result.return_value = "output" + + result = self.batch.do_batch("device", ["cmd1"]) + + self.assertEqual("output", result) + self.assertEqual(1, mock_spawn.call_count) + self.queue.add_batch.assert_called_once_with(["cmd1"]) + self.queue.wait_for_result.assert_called_once_with("item", 300) + + def test_execute_pending_batches_skip(self): + self.queue.get_batches.return_value = [] + + result = self.batch._execute_pending_batches("device", "item") + + self.assertIsNone(result) + + def test_execute_pending_batches_skip2(self): + self.queue.get_batches.return_value = ["work"] + # Work was consumed by another worker before we could get the lock. + self.queue.acquire_worker_lock.return_value = None + + result = self.batch._execute_pending_batches("device", "item") + + self.assertIsNone(result) + + @mock.patch.object(batching.SwitchBatch, "_send_commands") + def test_execute_pending_batches_skip3(self, mock_send): + self.queue.get_batches.side_effect = [["work"], None] + # Work was consumed by another worker before we got the lock. + lock = mock.MagicMock() + self.queue.acquire_worker_lock.return_value = lock + + result = self.batch._execute_pending_batches("device", "item") + + self.assertIsNone(result) + self.assertEqual(0, mock_send.call_count) + + @mock.patch.object(batching.SwitchBatch, "_send_commands") + def test_execute_pending_batches_success(self, mock_send): + batches = [ + {"cmds": ["cmd1", "cmd2"]}, + {"cmds": ["cmd3", "cmd4"]}, + ] + self.queue.get_batches.return_value = batches + device = mock.MagicMock() + lock = mock.MagicMock() + self.queue.acquire_worker_lock.return_value = lock + + self.batch._execute_pending_batches(device, "item") + + mock_send.assert_called_once_with(device, batches, lock) + self.queue.acquire_worker_lock.assert_called_once_with("item") + lock.release.assert_called_once_with() + + @mock.patch.object(batching.SwitchBatch, "_send_commands") + def test_execute_pending_batches_failure(self, mock_send): + batches = [ + {"cmds": ["cmd1", "cmd2"]}, + {"cmds": ["cmd3", "cmd4"]}, + ] + self.queue.get_batches.return_value = batches + device = mock.MagicMock() + lock = mock.MagicMock() + self.queue.acquire_worker_lock.return_value = lock + mock_send.side_effect = exc.GenericSwitchBatchError + + self.assertRaises(exc.GenericSwitchBatchError, + self.batch._execute_pending_batches, + device, "item") + + lock.release.assert_called_once_with() + + def test_send_commands_one_batch(self): + device = mock.MagicMock() + device.send_config_set.return_value = "output" + batches = [ + {"cmds": ["cmd1", "cmd2"]}, + ] + lock = mock.MagicMock() + + self.batch._send_commands(device, batches, lock) + + connection = device._get_connection.return_value.__enter__.return_value + device.send_config_set.assert_called_once_with( + connection, ["cmd1", "cmd2"]) + lock.refresh.assert_called_once_with() + lock.is_acquired.assert_called_once_with() + self.queue.record_result.assert_called_once_with( + {"cmds": ["cmd1", "cmd2"], "result": "output"}) + device.save_configuration.assert_called_once_with(connection) + + def test_send_commands_two_batches(self): + device = mock.MagicMock() + device.send_config_set.side_effect = ["output1", "output2"] + batches = [ + {"cmds": ["cmd1", "cmd2"]}, + {"cmds": ["cmd3", "cmd4"]}, + ] + lock = mock.MagicMock() + + self.batch._send_commands(device, batches, lock) + + connection = device._get_connection.return_value.__enter__.return_value + self.assertEqual(2, device.send_config_set.call_count) + device.send_config_set.assert_has_calls([ + mock.call(connection, ["cmd1", "cmd2"]), + mock.call(connection, ["cmd3", "cmd4"]) + ]) + self.assertEqual(2, lock.refresh.call_count) + self.assertEqual(2, lock.is_acquired.call_count) + self.assertEqual(2, self.queue.record_result.call_count) + self.queue.record_result.assert_has_calls([ + mock.call({"cmds": ["cmd1", "cmd2"], "result": "output1"}), + mock.call({"cmds": ["cmd3", "cmd4"], "result": "output2"}) + ]) + device.save_configuration.assert_called_once_with(connection) + self.assertEqual(1, device.save_configuration.call_count) + + def test_send_commands_failure(self): + device = mock.MagicMock() + device.send_config_set.side_effect = Exception("Bang") + batches = [ + {"cmds": ["cmd1", "cmd2"]}, + ] + lock = mock.MagicMock() + + self.batch._send_commands(device, batches, lock) + + connection = device._get_connection.return_value.__enter__.return_value + device.send_config_set.assert_called_once_with( + connection, ["cmd1", "cmd2"]) + lock.refresh.assert_called_once_with() + lock.is_acquired.assert_called_once_with() + self.queue.record_result.assert_called_once_with( + {"cmds": ["cmd1", "cmd2"], "error": "Bang"}) + device.save_configuration.assert_called_once_with(connection) + + def test_send_commands_lock_timeout(self): + device = mock.MagicMock() + device.send_config_set.side_effect = Exception("Bang") + batches = [ + {"cmds": ["cmd1", "cmd2"]}, + ] + lock = mock.MagicMock() + lock.is_acquired.return_value = False + + self.assertRaises(exc.GenericSwitchBatchError, + self.batch._send_commands, device, batches, lock) + + connection = device._get_connection.return_value.__enter__.return_value + device.send_config_set.assert_called_once_with( + connection, ["cmd1", "cmd2"]) + self.assertEqual(0, self.queue.record_result.call_count) + self.assertEqual(0, device.save_configuration.call_count) diff --git a/releasenotes/notes/batching-12d9005924fd9d74.yaml b/releasenotes/notes/batching-12d9005924fd9d74.yaml new file mode 100644 index 00000000..2174fc47 --- /dev/null +++ b/releasenotes/notes/batching-12d9005924fd9d74.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Adds support for batching of requests using etcd as a task queue. diff --git a/requirements.txt b/requirements.txt index 0b985407..82e9ddb9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ # The order of packages is significant, because pip processes them in the order # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. +etcd3gw>=0.2.4 # Apache-2.0 +eventlet>=0.18.2 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0 netmiko>=2.4.1 # MIT neutron>=13.0.0.0b1 # Apache-2.0