diff --git a/tests/polling/test_polling_plugin_scheduler.py b/tests/polling/test_polling_plugin_scheduler.py index 03a7809a..f8548aee 100644 --- a/tests/polling/test_polling_plugin_scheduler.py +++ b/tests/polling/test_polling_plugin_scheduler.py @@ -9,7 +9,7 @@ from mock import create_autospec, patch, MagicMock from yahoo_panoptes.polling.polling_plugin_scheduler import polling_plugin_scheduler_task, \ - start_polling_plugin_scheduler + start_polling_plugin_scheduler, celery_beat_service_started from yahoo_panoptes.framework.celery_manager import PanoptesCeleryConfig, PanoptesCeleryPluginScheduler from yahoo_panoptes.framework.resources import PanoptesContext, PanoptesResource @@ -30,8 +30,16 @@ def _callback(*args): def mock_get_resources(*args): - mock_resource = create_autospec(PanoptesResource) - return [mock_resource] + # Can't deepcopy a NonCallableMagicMock + return [ + PanoptesResource(resource_site='test', + resource_class='test', + resource_subclass='test', + resource_type='test', + resource_id='test', + resource_endpoint='test', + resource_plugin='test') + ] class TestPanoptesPollingPluginScheduler(unittest.TestCase): @@ -55,8 +63,9 @@ def setUp(self): @patch('redis.StrictRedis', panoptes_mock_redis_strict_client) @patch('kazoo.client.KazooClient', panoptes_mock_kazoo_client) - def test_basic_operations(self): + def test_basic_operations_first_run(self): celery_app = self._scheduler.start() + celery_beat_service = Service(celery_app, max_interval=None, schedule_filename=None, scheduler_cls=PanoptesCeleryPluginScheduler) with patch('yahoo_panoptes.polling.polling_plugin_scheduler.const.DEFAULT_CONFIG_FILE_PATH', @@ -66,6 +75,20 @@ def test_basic_operations(self): start_polling_plugin_scheduler() polling_plugin_scheduler_task(celery_beat_service) + @patch('redis.StrictRedis', panoptes_mock_redis_strict_client) + @patch('kazoo.client.KazooClient', panoptes_mock_kazoo_client) + def test_basic_operations_next_run(self): + celery_app = self._scheduler.start() + + celery_beat_service = Service(celery_app, max_interval=None, schedule_filename=None, + scheduler_cls=PanoptesCeleryPluginScheduler) + with patch('yahoo_panoptes.polling.polling_plugin_scheduler.const.DEFAULT_CONFIG_FILE_PATH', + self.panoptes_test_conf_file): + with patch('yahoo_panoptes.polling.polling_plugin_scheduler.PanoptesResourceCache.get_resources', + mock_get_resources): + start_polling_plugin_scheduler() + polling_plugin_scheduler_task(celery_beat_service, iteration_count=1) + @patch('redis.StrictRedis', panoptes_mock_redis_strict_client) @patch('kazoo.client.KazooClient', panoptes_mock_kazoo_client) def test_error_messages(self): @@ -156,7 +179,7 @@ def test_polling_plugin_scheduler_context_error(self): @patch('redis.StrictRedis', panoptes_mock_redis_strict_client) @patch('kazoo.client.KazooClient', panoptes_mock_kazoo_client) - def test_polling_plugin_scheduler_agent_config_error(self): + def test_polling_plugin_scheduler_tasklling_plugin_scheduler_agent_config_error(self): with patch('yahoo_panoptes.polling.polling_plugin_scheduler.const.DEFAULT_CONFIG_FILE_PATH', self.panoptes_test_conf_file): mock_config = MagicMock(side_effect=Exception) @@ -192,3 +215,26 @@ def test_celery_errors(self): mock_start): with self.assertRaises(SystemExit): start_polling_plugin_scheduler() + + def test_celery_beat_service_connect_function(self): + celery_app = self._scheduler.start() + celery_beat_service = Service(celery_app, max_interval=None, schedule_filename=None, + scheduler_cls=PanoptesCeleryPluginScheduler) + + self.assertFalse(hasattr(celery_beat_service.scheduler, 'panoptes_context')) + self.assertFalse(hasattr(celery_beat_service.scheduler, 'metadata_kv_store_class')) + self.assertFalse(hasattr(celery_beat_service.scheduler, 'task_prefix')) + + with patch('yahoo_panoptes.polling.polling_plugin_scheduler.polling_plugin_scheduler') as mock_scheduler: + celery_beat_service_started(sender=celery_beat_service) + + self.assertTrue(hasattr(celery_beat_service.scheduler, 'panoptes_context')) + self.assertIsNotNone(celery_beat_service.scheduler.metadata_kv_store_class) + self.assertIsNotNone(celery_beat_service.scheduler.task_prefix) + mock_scheduler.run.assert_called_with(celery_beat_service, None) + + with patch('yahoo_panoptes.polling.polling_plugin_scheduler.polling_plugin_scheduler') as mock_scheduler: + mock_scheduler.run.side_effect = Exception + + with self.assertRaises(SystemExit): + celery_beat_service_started(sender=celery_beat_service) diff --git a/tests/test_framework.py b/tests/test_framework.py index 2bdc3d50..e19fcbee 100644 --- a/tests/test_framework.py +++ b/tests/test_framework.py @@ -5,8 +5,6 @@ from __future__ import absolute_import from builtins import str -from builtins import object -import collections import glob import json import time @@ -23,7 +21,8 @@ from yahoo_panoptes.framework.utilities.snmp.mibs import base from yahoo_panoptes.framework.celery_manager import PanoptesCeleryError, PanoptesCeleryConfig, \ - PanoptesCeleryValidators, PanoptesCeleryInstance, PanoptesCeleryPluginScheduler + PanoptesCeleryValidators, PanoptesCeleryInstance, PanoptesCeleryPluginScheduler, \ + PanoptesUniformScheduler from yahoo_panoptes.framework.configuration_manager import * from yahoo_panoptes.framework.const import RESOURCE_MANAGER_RESOURCE_EXPIRE from yahoo_panoptes.framework.context import * @@ -1092,6 +1091,76 @@ def test_panoptes_celery_plugin_scheduler(self): self.assertEqual(celery_plugin_scheduler.tick(), 0) assert celery_plugin_scheduler.tick() > 0 + @patch(u'redis.StrictRedis', panoptes_mock_redis_strict_client) + @patch('time.time', mock_time) + @patch('yahoo_panoptes.framework.resources.time', mock_time) + def test_panoptes_uniform_plugin_scheduler(self): + + celery_config = PanoptesCeleryConfig('test') + panoptes_context = PanoptesContext(self.panoptes_test_conf_file, + key_value_store_class_list=[PanoptesTestKeyValueStore]) + + kv_store = panoptes_context.get_kv_store(PanoptesTestKeyValueStore) + kv_store.set('plugin_metadata:test_task:last_uniformly_scheduled', '1569967002.65') + + celery_instance = PanoptesCeleryInstance(panoptes_context, celery_config) + celery_uniform_scheduler = PanoptesUniformScheduler(app=celery_instance.celery) + + celery_uniform_scheduler.panoptes_context = panoptes_context + celery_uniform_scheduler.metadata_kv_store_class = PanoptesTestKeyValueStore + + new_schedule = dict() + new_schedule['celery.backend_cleanup'] = { + 'task': 'celery.backend_cleanup', + 'schedule': crontab('0', '4', '*'), + 'options': {'expires': 12 * 3600} + } + new_schedule['test_task'] = { + 'task': const.POLLING_PLUGIN_AGENT_MODULE_NAME, + 'schedule': timedelta(seconds=60), + 'args': ('test_plugin', 'test'), + 'last_run_at': datetime.utcfromtimestamp(DUMMY_TIME - 1), + 'options': { + 'expires': 60, + 'time_limit': 120 + } + } + new_schedule['test_task_1'] = { + 'task': const.POLLING_PLUGIN_AGENT_MODULE_NAME, + 'schedule': timedelta(seconds=60), + 'args': ('test_plugin', 'test'), + 'last_run_at': datetime.utcfromtimestamp(DUMMY_TIME - 1), + 'options': { + 'expires': 60, + 'time_limit': 120 + } + } + + celery_uniform_scheduler.update(celery_uniform_scheduler.logger, new_schedule, called_by_panoptes=True) + self.assertEqual(len(celery_uniform_scheduler.schedule), len(new_schedule)) + self.assertEqual(celery_uniform_scheduler.SCHEDULE_POPULATED, True) + + del new_schedule['celery.backend_cleanup'] + celery_uniform_scheduler.update(celery_uniform_scheduler.logger, new_schedule, called_by_panoptes=True) + self.assertEqual(len(celery_uniform_scheduler.schedule), len(new_schedule)) + self.assertEqual(celery_uniform_scheduler.SCHEDULE_POPULATED, True) + + new_schedule['test_task_1']['schedule'] = timedelta(seconds=30) + new_schedule['test_task_1']['args'] = ('test_plugin', 'update') + celery_uniform_scheduler.update(celery_uniform_scheduler.logger, new_schedule, called_by_panoptes=True) + + self.assertEqual(celery_uniform_scheduler.schedule['test_task_1'].schedule, timedelta(seconds=30)) + self.assertEqual(celery_uniform_scheduler.schedule['test_task_1'].args, ('test_plugin', 'update')) + + mock_producer = Mock() + with patch('yahoo_panoptes.framework.celery_manager.PanoptesUniformScheduler.apply_entry', return_value=None): + with patch('yahoo_panoptes.framework.celery_manager.PanoptesUniformScheduler.producer', mock_producer): + self.assertIsNone(celery_uniform_scheduler._heap) + self.assertEqual(celery_uniform_scheduler.tick(), 0) + assert celery_uniform_scheduler.tick() > 0 + + self.assertIsNone(celery_uniform_scheduler.obtain_last_uniformly_scheduled_time(None, 'key'), None) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 11e8392b..f13af2f5 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -8,10 +8,11 @@ from celery import app from mock import create_autospec, patch, MagicMock, Mock - +from datetime import timedelta from celery.beat import Service -from yahoo_panoptes.framework.celery_manager import PanoptesCeleryConfig, PanoptesCeleryPluginScheduler +from yahoo_panoptes.framework.celery_manager import PanoptesCeleryConfig, PanoptesCeleryPluginScheduler, \ + PanoptesScheduleEntry from yahoo_panoptes.framework.resources import PanoptesContext from yahoo_panoptes.framework.plugins.scheduler import PanoptesPluginScheduler from yahoo_panoptes.framework.utilities.tour_of_duty import PanoptesTourOfDuty @@ -128,3 +129,114 @@ def test_celery_beat_error(self): with patch(u'yahoo_panoptes.framework.plugins.scheduler.PanoptesCeleryInstance', mock_celery_instance): celery_app = self._scheduler.start() self.assertIsNone(celery_app) + + +class TestPanoptesUniformScheduleEntry(unittest.TestCase): + + @patch(u'redis.StrictRedis', panoptes_mock_redis_strict_client) + @patch(u'kazoo.client.KazooClient', panoptes_mock_kazoo_client) + def setUp(self): + self.my_dir, self.panoptes_test_conf_file = get_test_conf_file() + self._panoptes_context = PanoptesContext(self.panoptes_test_conf_file, + key_value_store_class_list=[PanoptesTestKeyValueStore], + create_message_producer=False, async_message_producer=False, + create_zookeeper_client=True) + self.mock_kv_store = self._panoptes_context.get_kv_store(PanoptesTestKeyValueStore) + + @patch('time.time') + def test_not_scheduled_before(self, mock_time): + # Test `Task` Has Never Been Uniformly Scheduled + + mock_time.return_value = 60 + + schedule_entry_not_due = PanoptesScheduleEntry( + name='Test Task', task='', + total_run_count=0, schedule=timedelta(seconds=60), + kv_store=self.mock_kv_store) + + self.assertEqual(schedule_entry_not_due.run_at, 72.0) + schedstate = schedule_entry_not_due.is_due() + + self.assertEqual(schedstate.is_due, False) + self.assertEqual(schedstate.next, 12.0) + + # Task Is Due + mock_time.side_effect = [60, 72, 72] + + schedule_entry_due = PanoptesScheduleEntry( + name='Test Task', task='', + total_run_count=0, schedule=timedelta(seconds=60), + kv_store=self.mock_kv_store) + + self.assertEqual(schedule_entry_due.run_at, 72.0) + schedstate = schedule_entry_due.is_due() + + self.assertEqual(schedstate.is_due, True) + self.assertEqual(schedstate.next, 60.0) + self.assertEqual(self.mock_kv_store.get('plugin_metadata:Test Task:last_uniformly_scheduled'), '72') + + # Verify Next Updates The Correct Fields + schedule_entry_due = next(schedule_entry_due) + self.assertEqual(schedule_entry_due.total_run_count, 1) + self.assertEqual(schedule_entry_due.uniformly_scheduled, True) + self.assertIsNotNone(schedule_entry_due.last_run_at) + + @patch('time.time') + def test_scheduled_before_and_overdue(self, mock_time): + """ + Verify that a new instance of the scheduler attempts to pick up where the previous + scheduler left off. In this case it is unable as too much time has passed since + the last uniformly scheduled key was set. + """ + mock_time.return_value = 100 + + schedule_entry = PanoptesScheduleEntry( + name='Test Task', task='', + total_run_count=0, schedule=timedelta(seconds=60), + kv_store=self.mock_kv_store, + last_uniformly_scheduled_at='30') + + self.assertEqual(schedule_entry.run_at, 112.0) + schedstate = schedule_entry.is_due() + + self.assertEqual(schedstate.is_due, False) + self.assertEqual(schedstate.next, 12.0) + + @patch('time.time') + def test_scheduled_before_and_not_overdue(self, mock_time): + """ + Verify that a new instance of the scheduler can pick up where + the previous one left off. + """ + mock_time.return_value = 70 + + schedule_entry = PanoptesScheduleEntry( + name='Test Task', task='', + total_run_count=0, schedule=timedelta(seconds=60), + kv_store=self.mock_kv_store, + last_uniformly_scheduled_at='60') + + self.assertEqual(schedule_entry.run_at, 120.0) + schedstate = schedule_entry.is_due() + + self.assertEqual(schedstate.is_due, False) + self.assertEqual(schedstate.next, 50.0) + + def test_schedule_entry_unique_identifier_does_not_throw(self): + + self.assertEqual( + PanoptesScheduleEntry.schedule_entry_unique_identifier(timedelta(seconds=60), ('Test', 1, 1.0, b'bytes')), + "60.0|Test|1|1.0|b'bytes'" + ) + + def test_invalid_key_in_redis_is_handled(self): + + schedule_entry = PanoptesScheduleEntry( + name='Test Task', task='', + total_run_count=0, schedule=timedelta(seconds=60), + kv_store=self.mock_kv_store, + last_uniformly_scheduled_at='60f') + + schedstate = schedule_entry.is_due() + self.assertEqual(schedstate.is_due, False) + self.assertEqual(schedstate.next, 60) diff --git a/yahoo_panoptes/discovery/discovery_plugin_scheduler.py b/yahoo_panoptes/discovery/discovery_plugin_scheduler.py index 53708103..08c22595 100644 --- a/yahoo_panoptes/discovery/discovery_plugin_scheduler.py +++ b/yahoo_panoptes/discovery/discovery_plugin_scheduler.py @@ -72,14 +72,15 @@ def __init__(self): app_name=const.DISCOVERY_PLUGIN_SCHEDULER_CELERY_APP_NAME) -def discovery_plugin_scheduler_task(celery_beat_service): +def discovery_plugin_scheduler_task(celery_beat_service, iteration_count=0): """ This function is the workhorse of the Discovery Plugin Scheduler module. It detects changes in plugins and their configuration and updates the Celery Beat schedule accordingly. Args: celery_beat_service (celery.beat.Service): The Celery Beat Service object associated with this Plugin Scheduler - + iteration_count (int): The number of times the scheduler task has been called. The count is tracked by the + PanoptesTourOfDuty class inside of the PanoptesPluginScheduler class. Returns: None """ @@ -141,7 +142,7 @@ def discovery_plugin_scheduler_task(celery_beat_service): try: scheduler = celery_beat_service.scheduler - scheduler.update(logger, new_schedule) + scheduler.update(logger, new_schedule, called_by_panoptes=True) end_time = time.time() logger.info(u'Scheduled %d tasks in %.2fs' % (len(new_schedule), end_time - start_time)) diff --git a/yahoo_panoptes/enrichment/enrichment_plugin_scheduler.py b/yahoo_panoptes/enrichment/enrichment_plugin_scheduler.py index 05844d7b..d468b254 100644 --- a/yahoo_panoptes/enrichment/enrichment_plugin_scheduler.py +++ b/yahoo_panoptes/enrichment/enrichment_plugin_scheduler.py @@ -80,7 +80,7 @@ def __init__(self): __init__(app_name=const.ENRICHMENT_PLUGIN_SCHEDULER_CELERY_APP_NAME) -def enrichment_plugin_scheduler_task(celery_beat_service): +def enrichment_plugin_scheduler_task(celery_beat_service, iteration_count=0): """ This function is the workhorse of the Enrichment Plugin Scheduler module. It detects changes in plugins and their configuration and updates the Celery Beat schedule accordingly. @@ -88,7 +88,8 @@ def enrichment_plugin_scheduler_task(celery_beat_service): Args: celery_beat_service (celery.beat.Service): The Celery Beat Service instance associated with this Plugin\ Scheduler - + iteration_count (int): The number of times the scheduler task has been called. The count is tracked by the + PanoptesTourOfDuty class inside of the PanoptesPluginScheduler class. Returns: None @@ -184,7 +185,7 @@ def enrichment_plugin_scheduler_task(celery_beat_service): try: scheduler = celery_beat_service.scheduler - scheduler.update(logger, new_schedule) + scheduler.update(logger, new_schedule, called_by_panoptes=True) end_time = time.time() logger.info(u'Scheduled %d tasks in %.2fs' % (len(new_schedule), end_time - start_time)) diff --git a/yahoo_panoptes/framework/celery_manager.py b/yahoo_panoptes/framework/celery_manager.py index 09e4dfa8..0f3786a1 100644 --- a/yahoo_panoptes/framework/celery_manager.py +++ b/yahoo_panoptes/framework/celery_manager.py @@ -7,10 +7,14 @@ """ from builtins import object import heapq +import time +import copy +import mmh3 import threading from celery import Celery -from celery.beat import Scheduler, event_t +from celery.beat import Scheduler, ScheduleEntry, event_t +from celery.schedules import schedstate, crontab from yahoo_panoptes.framework import const from yahoo_panoptes.framework.context import PanoptesContext @@ -132,20 +136,34 @@ class PanoptesCeleryPluginScheduler(Scheduler): """ The base plugin scheduler class in Panoptes """ - def update(self, logger, new_schedule): + + def merge_inplace(self, b, called_by_panoptes=False): + """ + Updates the set of schedule entries. + + This function is defined within the PanoptesCeleryPluginScheduler + class to allow for backwards compatibility with the new + called_by_panoptes argument. The argument is needed for + the UniformScheduler Class + """ + super(PanoptesCeleryPluginScheduler, self).merge_inplace(b) + + def update(self, logger, new_schedule, called_by_panoptes=False): """ Updates the currently installed scheduled Args: logger (logging.logger): The logger to use new_schedule (dict): The new schedule + called_by_panoptes (bool): Was .update() + called by panoptes or a function within celery/beat.py Returns: None """ logger.debug(u'New schedule: %s' % str(new_schedule)) logger.info(u'Going to schedule %d tasks' % len(new_schedule)) with thread_lock: - self.merge_inplace(new_schedule) + self.merge_inplace(new_schedule, called_by_panoptes) logger.info(u'Scheduler now has %d tasks' % len(self.schedule)) def tick(self, event_t=event_t, min=min, heappop=heapq.heappop, heappush=heapq.heappush): @@ -161,3 +179,256 @@ def tick(self, event_t=event_t, min=min, heappop=heapq.heappop, heappush=heapq.h ) return response + + +class PanoptesScheduleEntry(ScheduleEntry): + """An entry in the scheduler. + + Arguments: + name (str): see :attr:`name`. + schedule (~celery.schedules.schedule): see :attr:`schedule`. + args (Tuple): see :attr:`args`. + kwargs (Dict): see :attr:`kwargs`. + options (Dict): see :attr:`options`. + last_run_at (~datetime.datetime): see :attr:`last_run_at`. + total_run_count (int): see :attr:`total_run_count`. + relative (bool): Is the time relative to when the server starts? + run_at (int): Epoch time when the schedule entry should run next + uniformly_scheduled (bool): Whether or not the ScheduleEntry has been + uniformly scheduled (splay added to the initial due date). + kv_store: ('redis.client.Redis'): Connection to the KV Store + last_uniformly_scheduled_at (str): Last time this `job` was scheduled + by a different scheduler process. + """ + + def __init__(self, name=None, task=None, last_run_at=None, + total_run_count=None, schedule=None, args=(), + kwargs=None, options=None, relative=None, + app=None, run_at=None, uniformly_scheduled=False, + kv_store=None, last_uniformly_scheduled_at=None): + super(PanoptesScheduleEntry, self).__init__( + name=name, task=task, last_run_at=last_run_at, + total_run_count=total_run_count, schedule=schedule, + args=args, kwargs=kwargs, options=options, + relative=relative, app=app + ) + + self.run_at = run_at + self.uniformly_scheduled = uniformly_scheduled + self.kv_store = kv_store + self.last_uniformly_scheduled_at = last_uniformly_scheduled_at + + if isinstance(self.schedule, crontab): + return + + try: + # Only add splay on the first run + if not self.uniformly_scheduled: + print(f'{self.name} has not been uniformly scheduled by this process.') + + plugin_execution_frequency = self.schedule.seconds + time_now = time.time() + expected_execution_date_from_last_schedule = 0 + + print(f'Checking to see if {self.name} has been scheduled in the ' + f'last {plugin_execution_frequency} seconds') + if self.last_uniformly_scheduled_at is not None: + print(f'{self.name} was last uniformly scheduled at {self.last_uniformly_scheduled_at}') + self.last_uniformly_scheduled_at = float(self.last_uniformly_scheduled_at) + expected_execution_date_from_last_schedule = self.last_uniformly_scheduled_at + \ + plugin_execution_frequency + else: + print(f'{self.name} has never been uniformly scheduled') + + if expected_execution_date_from_last_schedule >= time_now > self.last_uniformly_scheduled_at: + print(f'Picking up where the previous scheduler process left off. Scheduling ' + f'{self.name} to execute in ' + f'{expected_execution_date_from_last_schedule - time.time()} seconds.') + self.run_at = expected_execution_date_from_last_schedule + return + else: + print(f'Unable to schedule {self.name} where the previous scheduler process left off') + + splay_s = mmh3.hash(self.name, signed=False) % min(self.schedule.seconds, 60) + + self.run_at = time_now + splay_s + print(f'Uniformly scheduling {self.name} with splay {splay_s} due {self.run_at}') + + except Exception as e: + print(f'Error Scheduling {repr(e)}') + + @staticmethod + def schedule_entry_unique_identifier(schedule, args): + return "{}|{}".format(str(float(schedule.seconds)), "|".join(map(str, args))) + + def __next__(self, last_run_at=None): + # Set uniformly_scheduled to True so splay isn't added again. + return self.__class__(**dict( + self, + last_run_at=self.default_now(), + total_run_count=self.total_run_count + 1, + uniformly_scheduled=True + )) + + def is_due(self): + try: + if not self.uniformly_scheduled and not isinstance(self.schedule, crontab): + run_in = self.run_at - time.time() + + if run_in > 0: + value = schedstate(is_due=False, next=run_in) + else: + value = schedstate(is_due=True, next=self.schedule.seconds) + + else: + value = super(PanoptesScheduleEntry, self).is_due() + + except Exception as e: + # If there is an issue with the key set in redis + # Assume 60 second Interval. + print(f'{self.name} Attribute Error {repr(e)}') + self.run_at = time.time() + 60 + value = schedstate(is_due=False, next=60) + + if value[0] and self.kv_store: + # This `ScheduleEntry` is due to be executed. + # Update the `last_uniformly_scheduled` key in Redis + uniformly_scheduled = time.time() + + last_uniformly_scheduled_at_key = ':'.join([ + 'plugin_metadata', + self.name, + 'last_uniformly_scheduled' + ]) + + self.kv_store.set( + last_uniformly_scheduled_at_key, + str(uniformly_scheduled), + expire=int(self.schedule.seconds * 2) + ) + + return value + + +class PanoptesUniformScheduler(PanoptesCeleryPluginScheduler): + Entry = PanoptesScheduleEntry + UNIFORM_PLUGIN_LAST_UNIFORMLY_SCHEDULED_KEY = 'last_uniformly_scheduled' + SCHEDULE_POPULATED = False + + def obtain_last_uniformly_scheduled_time(self, kv_store, key): + + try: + last_uniformly_scheduled_at_key = ':'.join([ + 'plugin_metadata', + key, + 'last_uniformly_scheduled' + ]) + + last_uniformly_scheduled_at = kv_store.get(last_uniformly_scheduled_at_key) + return last_uniformly_scheduled_at + + except Exception as e: + print(f'PanoptesUniformScheduler::obtain_last_uniformly_scheduled_time. {repr(e)}') + return None + + def merge_inplace(self, b, called_by_panoptes=False): + + if not called_by_panoptes: + super(PanoptesUniformScheduler, self).merge_inplace(b) + return + + metadata_kv_store = self.panoptes_context.get_kv_store(self.metadata_kv_store_class) + schedule = self.schedule + + A, B = set(schedule), set(b) + + for key in A ^ B: + schedule.pop(key, None) + + for key in B: + + if schedule.get(key): + panoptes_schedule_entry = schedule[key] + + existing_panoptes_schedule_entry = PanoptesScheduleEntry.\ + schedule_entry_unique_identifier(panoptes_schedule_entry.schedule, panoptes_schedule_entry.args) + new_panoptes_schedule_entry_candidate = PanoptesScheduleEntry.\ + schedule_entry_unique_identifier(b[key]['schedule'], b[key]['args']) + + if existing_panoptes_schedule_entry == new_panoptes_schedule_entry_candidate: + print(f'Skipping {new_panoptes_schedule_entry_candidate}, ' + f'there is already a matching ScheduleEntry being executed') + continue + else: + print(f'Found Existing Schedule Entry which didn\'t match the new one.' + f'{existing_panoptes_schedule_entry} !== {new_panoptes_schedule_entry_candidate}' + f' replacing now.') + b[key]['last_uniformly_scheduled_at'] = self.obtain_last_uniformly_scheduled_time(metadata_kv_store, + key) + b[key]['kv_store'] = metadata_kv_store + schedule[key].update(self.Entry(**dict(b[key], name=key, app=self.app))) + + else: + b[key]['last_uniformly_scheduled_at'] = self.obtain_last_uniformly_scheduled_time(metadata_kv_store, + key) + b[key]['kv_store'] = metadata_kv_store + print(f'Entry is {self.Entry}') + entry = self.Entry(**dict(b[key], name=key, app=self.app)) + schedule[key] = entry + + if called_by_panoptes and not self.SCHEDULE_POPULATED: + print('Panoptes merge_inplace call finished, setting schedule_populated to loosen tick loop.') + self.SCHEDULE_POPULATED = True + + def tick(self, event_t=event_t, min=min, heappop=heapq.heappop, heappush=heapq.heappush): + """ + Make the tick function thread safe + + Run a tick - one iteration of the scheduler. + + Executes on due task per call + + Returns: + float: preferred delay in seconds for next call. + """ + with thread_lock: + adjust = self.adjust + max_interval = self.max_interval + + if self._heap is None or not self.schedules_equal(self.old_schedulers, self.schedule): + self.old_schedulers = copy.copy(self.schedule) + + print('Repopulating the heap') + self.populate_heap() + + H = self._heap + + if not H: + return max_interval + + # event_t = namedtuple('event_t', ('time', 'priority', 'entry')) + event = H[0] + entry = event[2] + + is_due, next_time_to_run = self.is_due(entry) + + if is_due: + verify = heappop(H) + + if verify is event: + next_entry = self.reserve(entry) + self.apply_entry(entry, producer=self.producer) + heappush(H, event_t(self._when(next_entry, next_time_to_run), event[1], next_entry)) + return 0 + else: + heappush(H, verify) + return min(verify[0], max_interval) + + # Temporarily spin in a tight loop until the + # @beat_init.connect callback occurs and calls run on the + # (yahoo_panoptes.framework.plugins.scheduler.)PanoptesPluginScheduler + # which calls update (-> merge_inplace) on the cached schedule. + if self.SCHEDULE_POPULATED is False: + return min(adjust(next_time_to_run), 0.01) + + return min(adjust(next_time_to_run) or max_interval, max_interval) diff --git a/yahoo_panoptes/framework/plugins/scheduler.py b/yahoo_panoptes/framework/plugins/scheduler.py index 46e8fb83..297a9d90 100644 --- a/yahoo_panoptes/framework/plugins/scheduler.py +++ b/yahoo_panoptes/framework/plugins/scheduler.py @@ -167,7 +167,8 @@ def _plugin_scheduler_task_thread(self): if self._lock.locked: self._cycles_without_lock = 0 try: - self._plugin_scheduler_task(self._plugin_scheduler_celery_beat_service) + self._plugin_scheduler_task(self._plugin_scheduler_celery_beat_service, + self._tour_of_duty.iterations) self._tour_of_duty.increment_task_count() except Exception: logger.exception(u'Error trying to execute plugin scheduler task') @@ -205,14 +206,16 @@ def _shutdown(self): Returns: None """ + logger = self._logger + if self._shutdown_plugin_scheduler.is_set(): print(u'%s Plugin Scheduler already in the process of shutdown, ignoring redundant call') return shutdown_interval = int(int(self._config[self._plugin_type][u'plugin_scan_interval']) * 2) - print(u'Shutdown/restart requested - may take up to %s seconds' % shutdown_interval) + logger.info(u'Shutdown/restart requested - may take up to %s seconds' % shutdown_interval) - print(u'Signalling for %s Plugin Scheduler Task Thread to shutdown' % self._plugin_type_display_name) + logger.info(u'Signalling for %s Plugin Scheduler Task Thread to shutdown' % self._plugin_type_display_name) self._shutdown_plugin_scheduler.set() if self._t != threading.currentThread(): @@ -220,21 +223,21 @@ def _shutdown(self): self._t.join() if (self._t is None) or (not self._t.isAlive()): - print(u'%s Plugin Scheduler Task Thread is not active - shutting down other services' % + logger.info(u'%s Plugin Scheduler Task Thread is not active - shutting down other services' % self._plugin_type_display_name) else: - print(u'%s Plugin Scheduler shutdown called from plugin scheduler task thread' % + logger.info(u'%s Plugin Scheduler shutdown called from plugin scheduler task thread' % self._plugin_type_display_name) if self._plugin_scheduler_celery_beat_service: - print(u'Shutting down Celery Beat Service') + logger.info(u'Shutting down Celery Beat Service') self._plugin_scheduler_celery_beat_service.stop() if self._lock: - print(u'Releasing lock') + logger.info(u'Releasing lock') self._lock.release() - print(u'Plugin Scheduler shutdown complete') + logger.info(u'Plugin Scheduler shutdown complete') sys.exit() def _install_signal_handlers(self): diff --git a/yahoo_panoptes/framework/utilities/tour_of_duty.py b/yahoo_panoptes/framework/utilities/tour_of_duty.py index 066f4cc2..e6ccaf05 100644 --- a/yahoo_panoptes/framework/utilities/tour_of_duty.py +++ b/yahoo_panoptes/framework/utilities/tour_of_duty.py @@ -84,6 +84,16 @@ def time_completed(self): def memory_growth_completed(self): return (self._get_memory_utilization_in_mb() - self._initial_memory_mb) > self.adjusted_memory_growth_mb + @property + def iterations(self): + """ + Returns the number of iterations the scheduler has completed + + Returns: + int + """ + return self._task_count + def increment_task_count(self): """ Increments the task count which counts towards the Tour Of Duty diff --git a/yahoo_panoptes/polling/polling_plugin_scheduler.py b/yahoo_panoptes/polling/polling_plugin_scheduler.py index 892c1e1b..d5159327 100644 --- a/yahoo_panoptes/polling/polling_plugin_scheduler.py +++ b/yahoo_panoptes/polling/polling_plugin_scheduler.py @@ -14,6 +14,7 @@ """ import faulthandler import sys +import copy import time from resource import getrusage, RUSAGE_SELF from datetime import datetime, timedelta @@ -40,6 +41,7 @@ polling_plugin_scheduler = None celery = None logger = None +cached_schedule = None class PanoptesPollingPluginSchedulerError(PanoptesBaseException): @@ -81,27 +83,16 @@ def __init__(self): super(PanoptesCeleryPollingAgentConfig, self).__init__(app_name=const.POLLING_PLUGIN_SCHEDULER_CELERY_APP_NAME) -def polling_plugin_scheduler_task(celery_beat_service): - """ - This function is the workhorse of the Polling Plugin Scheduler module. It detects changes in plugins and their - configuration and updates the Celery Beat schedule accordingly. - - Args: - celery_beat_service (celery.beat.Service): The Celery Beat Service instance associated with this Plugin\ - Scheduler - - Returns: - None +def polling_plugin_get_schedule(): - """ start_time = time.time() try: resource_cache = PanoptesResourceCache(panoptes_context) resource_cache.setup_resource_cache() - except: - logger.exception(u'Could not create resource cache, skipping cycle') - return + except Exception as e: + logger.exception(u'Could not create resource cache. {}'.format(repr(e))) + return {} try: plugin_manager = PanoptesPluginManager( @@ -113,9 +104,9 @@ def polling_plugin_scheduler_task(celery_beat_service): ) plugins = plugin_manager.getPluginsOfCategory(category_name=u'polling') logger.info(u'Found %d plugins' % len(plugins)) - except: - logger.exception(u'Error trying to load Polling plugins, skipping cycle') - return + except Exception as e: + logger.exception(u'Error trying to load Polling plugins, skipping cycle. {}'.format(repr(e))) + return {} new_schedule = dict() @@ -149,8 +140,8 @@ def polling_plugin_scheduler_task(celery_beat_service): if len(resource_set) == 0: logger.info( - u'No resources found for plugin "%s" after applying resource filter "%s", skipping plugin' % ( - plugin.name, plugin.resource_filter)) + u'No resources found for plugin "%s" after applying resource filter "%s", skipping plugin' % ( + plugin.name, plugin.resource_filter)) logger.info(u'Length of resource set {} for plugin {}'.format(len(resource_set), plugin.name)) @@ -179,9 +170,47 @@ def polling_plugin_scheduler_task(celery_beat_service): plugin_manager.unload_modules() logger.info('Unloaded plugin modules. Length of sys.modules after unloading modules: %d' % len(sys.modules)) + logger.info('Created Schedule With {} entries in {} seconds' + .format(len(new_schedule.keys()), time.time() - start_time)) + + return new_schedule + + +def polling_plugin_scheduler_task(celery_beat_service, iteration_count=0): + """ + This function is the workhorse of the Polling Plugin Scheduler module. It detects changes in plugins and their + configuration and updates the Celery Beat schedule accordingly. + + On a fresh start this function uses a cached copy of the schedule to immediately populate the + celery job queue. This is done to minimize the number of jobs that get dropped (due during the + switchover period) and need to be rescheduled with added splay which kills the previous cadence. + + Measured Switchover Time Using Cache = 50ms + Measured Switchover Time Without The Cached Schedule Entries = 3-4 seconds in a big deployment + + Args: + celery_beat_service (celery.beat.Service): The Celery Beat Service instance associated with this Plugin\ + Scheduler + iteration_count (int): The number of times the scheduler task has been called. The count is tracked by the + PanoptesTourOfDuty class inside of the PanoptesPluginScheduler class. + + Returns: + None + + """ + start_time = time.time() + + if iteration_count == 0: + logger.info('Using the cached schedule on the first run to resume task execution immediately.') + new_schedule = copy.deepcopy(cached_schedule) + cached_schedule.clear() + logger.info('Updated `new_schedule`, clearing `cached_schedule` to lower memory footprint.') + else: + new_schedule = polling_plugin_get_schedule() + try: scheduler = celery_beat_service.scheduler - scheduler.update(logger, new_schedule) + scheduler.update(logger, new_schedule, called_by_panoptes=True) end_time = time.time() logger.info(u'Scheduled %d tasks in %.2fs' % (len(new_schedule), end_time - start_time)) @@ -202,8 +231,7 @@ def start_polling_plugin_scheduler(): Returns: None """ - global polling_plugin_scheduler, celery, logger, panoptes_context - + global polling_plugin_scheduler, celery, logger, panoptes_context, cached_schedule try: panoptes_context = PanoptesPollingPluginSchedulerContext() except Exception as e: @@ -228,6 +256,8 @@ def start_polling_plugin_scheduler(): except Exception as e: sys.exit(u'Could not create a Plugin Scheduler object: %s' % repr(e)) + cached_schedule = polling_plugin_get_schedule() + try: celery = polling_plugin_scheduler.start() except Exception as e: @@ -252,6 +282,7 @@ def celery_beat_service_started(sender=None, args=None, **kwargs): """ global polling_plugin_scheduler sender.scheduler.panoptes_context = panoptes_context + sender.scheduler.metadata_kv_store_class = PanoptesPollingPluginAgentKeyValueStore sender.scheduler.task_prefix = const.POLLING_PLUGIN_SCHEDULER_CELERY_TASK_PREFIX try: