Skip to content

Commit

Permalink
Merge 91d2d44 into 59b0403
Browse files Browse the repository at this point in the history
  • Loading branch information
road-cycling committed Aug 10, 2020
2 parents 59b0403 + 91d2d44 commit 3ec851f
Show file tree
Hide file tree
Showing 9 changed files with 594 additions and 50 deletions.
56 changes: 51 additions & 5 deletions tests/polling/test_polling_plugin_scheduler.py
Expand Up @@ -9,7 +9,7 @@
from mock import create_autospec, patch, MagicMock

from yahoo_panoptes.polling.polling_plugin_scheduler import polling_plugin_scheduler_task, \
start_polling_plugin_scheduler
start_polling_plugin_scheduler, celery_beat_service_started

from yahoo_panoptes.framework.celery_manager import PanoptesCeleryConfig, PanoptesCeleryPluginScheduler
from yahoo_panoptes.framework.resources import PanoptesContext, PanoptesResource
Expand All @@ -30,8 +30,16 @@ def _callback(*args):


def mock_get_resources(*args):
mock_resource = create_autospec(PanoptesResource)
return [mock_resource]
# Can't deepcopy a NonCallableMagicMock
return [
PanoptesResource(resource_site='test',
resource_class='test',
resource_subclass='test',
resource_type='test',
resource_id='test',
resource_endpoint='test',
resource_plugin='test')
]


class TestPanoptesPollingPluginScheduler(unittest.TestCase):
Expand All @@ -55,8 +63,9 @@ def setUp(self):

@patch('redis.StrictRedis', panoptes_mock_redis_strict_client)
@patch('kazoo.client.KazooClient', panoptes_mock_kazoo_client)
def test_basic_operations(self):
def test_basic_operations_first_run(self):
celery_app = self._scheduler.start()

celery_beat_service = Service(celery_app, max_interval=None, schedule_filename=None,
scheduler_cls=PanoptesCeleryPluginScheduler)
with patch('yahoo_panoptes.polling.polling_plugin_scheduler.const.DEFAULT_CONFIG_FILE_PATH',
Expand All @@ -66,6 +75,20 @@ def test_basic_operations(self):
start_polling_plugin_scheduler()
polling_plugin_scheduler_task(celery_beat_service)

@patch('redis.StrictRedis', panoptes_mock_redis_strict_client)
@patch('kazoo.client.KazooClient', panoptes_mock_kazoo_client)
def test_basic_operations_next_run(self):
celery_app = self._scheduler.start()

celery_beat_service = Service(celery_app, max_interval=None, schedule_filename=None,
scheduler_cls=PanoptesCeleryPluginScheduler)
with patch('yahoo_panoptes.polling.polling_plugin_scheduler.const.DEFAULT_CONFIG_FILE_PATH',
self.panoptes_test_conf_file):
with patch('yahoo_panoptes.polling.polling_plugin_scheduler.PanoptesResourceCache.get_resources',
mock_get_resources):
start_polling_plugin_scheduler()
polling_plugin_scheduler_task(celery_beat_service, iteration_count=1)

@patch('redis.StrictRedis', panoptes_mock_redis_strict_client)
@patch('kazoo.client.KazooClient', panoptes_mock_kazoo_client)
def test_error_messages(self):
Expand Down Expand Up @@ -156,7 +179,7 @@ def test_polling_plugin_scheduler_context_error(self):

@patch('redis.StrictRedis', panoptes_mock_redis_strict_client)
@patch('kazoo.client.KazooClient', panoptes_mock_kazoo_client)
def test_polling_plugin_scheduler_agent_config_error(self):
def test_polling_plugin_scheduler_tasklling_plugin_scheduler_agent_config_error(self):
with patch('yahoo_panoptes.polling.polling_plugin_scheduler.const.DEFAULT_CONFIG_FILE_PATH',
self.panoptes_test_conf_file):
mock_config = MagicMock(side_effect=Exception)
Expand Down Expand Up @@ -192,3 +215,26 @@ def test_celery_errors(self):
mock_start):
with self.assertRaises(SystemExit):
start_polling_plugin_scheduler()

def test_celery_beat_service_connect_function(self):
celery_app = self._scheduler.start()
celery_beat_service = Service(celery_app, max_interval=None, schedule_filename=None,
scheduler_cls=PanoptesCeleryPluginScheduler)

self.assertFalse(hasattr(celery_beat_service.scheduler, 'panoptes_context'))
self.assertFalse(hasattr(celery_beat_service.scheduler, 'metadata_kv_store_class'))
self.assertFalse(hasattr(celery_beat_service.scheduler, 'task_prefix'))

with patch('yahoo_panoptes.polling.polling_plugin_scheduler.polling_plugin_scheduler') as mock_scheduler:
celery_beat_service_started(sender=celery_beat_service)

self.assertTrue(hasattr(celery_beat_service.scheduler, 'panoptes_context'))
self.assertIsNotNone(celery_beat_service.scheduler.metadata_kv_store_class)
self.assertIsNotNone(celery_beat_service.scheduler.task_prefix)
mock_scheduler.run.assert_called_with(celery_beat_service, None)

with patch('yahoo_panoptes.polling.polling_plugin_scheduler.polling_plugin_scheduler') as mock_scheduler:
mock_scheduler.run.side_effect = Exception

with self.assertRaises(SystemExit):
celery_beat_service_started(sender=celery_beat_service)
75 changes: 72 additions & 3 deletions tests/test_framework.py
Expand Up @@ -5,8 +5,6 @@
from __future__ import absolute_import

from builtins import str
from builtins import object
import collections
import glob
import json
import time
Expand All @@ -23,7 +21,8 @@

from yahoo_panoptes.framework.utilities.snmp.mibs import base
from yahoo_panoptes.framework.celery_manager import PanoptesCeleryError, PanoptesCeleryConfig, \
PanoptesCeleryValidators, PanoptesCeleryInstance, PanoptesCeleryPluginScheduler
PanoptesCeleryValidators, PanoptesCeleryInstance, PanoptesCeleryPluginScheduler, \
PanoptesUniformScheduler
from yahoo_panoptes.framework.configuration_manager import *
from yahoo_panoptes.framework.const import RESOURCE_MANAGER_RESOURCE_EXPIRE
from yahoo_panoptes.framework.context import *
Expand Down Expand Up @@ -1092,6 +1091,76 @@ def test_panoptes_celery_plugin_scheduler(self):
self.assertEqual(celery_plugin_scheduler.tick(), 0)
assert celery_plugin_scheduler.tick() > 0

@patch(u'redis.StrictRedis', panoptes_mock_redis_strict_client)
@patch('time.time', mock_time)
@patch('yahoo_panoptes.framework.resources.time', mock_time)
def test_panoptes_uniform_plugin_scheduler(self):

celery_config = PanoptesCeleryConfig('test')
panoptes_context = PanoptesContext(self.panoptes_test_conf_file,
key_value_store_class_list=[PanoptesTestKeyValueStore])

kv_store = panoptes_context.get_kv_store(PanoptesTestKeyValueStore)
kv_store.set('plugin_metadata:test_task:last_uniformly_scheduled', '1569967002.65')

celery_instance = PanoptesCeleryInstance(panoptes_context, celery_config)
celery_uniform_scheduler = PanoptesUniformScheduler(app=celery_instance.celery)

celery_uniform_scheduler.panoptes_context = panoptes_context
celery_uniform_scheduler.metadata_kv_store_class = PanoptesTestKeyValueStore

new_schedule = dict()
new_schedule['celery.backend_cleanup'] = {
'task': 'celery.backend_cleanup',
'schedule': crontab('0', '4', '*'),
'options': {'expires': 12 * 3600}
}
new_schedule['test_task'] = {
'task': const.POLLING_PLUGIN_AGENT_MODULE_NAME,
'schedule': timedelta(seconds=60),
'args': ('test_plugin', 'test'),
'last_run_at': datetime.utcfromtimestamp(DUMMY_TIME - 1),
'options': {
'expires': 60,
'time_limit': 120
}
}
new_schedule['test_task_1'] = {
'task': const.POLLING_PLUGIN_AGENT_MODULE_NAME,
'schedule': timedelta(seconds=60),
'args': ('test_plugin', 'test'),
'last_run_at': datetime.utcfromtimestamp(DUMMY_TIME - 1),
'options': {
'expires': 60,
'time_limit': 120
}
}

celery_uniform_scheduler.update(celery_uniform_scheduler.logger, new_schedule, called_by_panoptes=True)
self.assertEqual(len(celery_uniform_scheduler.schedule), len(new_schedule))
self.assertEqual(celery_uniform_scheduler.SCHEDULE_POPULATED, True)

del new_schedule['celery.backend_cleanup']
celery_uniform_scheduler.update(celery_uniform_scheduler.logger, new_schedule, called_by_panoptes=True)
self.assertEqual(len(celery_uniform_scheduler.schedule), len(new_schedule))
self.assertEqual(celery_uniform_scheduler.SCHEDULE_POPULATED, True)

new_schedule['test_task_1']['schedule'] = timedelta(seconds=30)
new_schedule['test_task_1']['args'] = ('test_plugin', 'update')
celery_uniform_scheduler.update(celery_uniform_scheduler.logger, new_schedule, called_by_panoptes=True)

self.assertEqual(celery_uniform_scheduler.schedule['test_task_1'].schedule, timedelta(seconds=30))
self.assertEqual(celery_uniform_scheduler.schedule['test_task_1'].args, ('test_plugin', 'update'))

mock_producer = Mock()
with patch('yahoo_panoptes.framework.celery_manager.PanoptesUniformScheduler.apply_entry', return_value=None):
with patch('yahoo_panoptes.framework.celery_manager.PanoptesUniformScheduler.producer', mock_producer):
self.assertIsNone(celery_uniform_scheduler._heap)
self.assertEqual(celery_uniform_scheduler.tick(), 0)
assert celery_uniform_scheduler.tick() > 0

self.assertIsNone(celery_uniform_scheduler.obtain_last_uniformly_scheduled_time(None, 'key'), None)


if __name__ == '__main__':
unittest.main()
116 changes: 114 additions & 2 deletions tests/test_scheduler.py
Expand Up @@ -8,10 +8,11 @@

from celery import app
from mock import create_autospec, patch, MagicMock, Mock

from datetime import timedelta
from celery.beat import Service

from yahoo_panoptes.framework.celery_manager import PanoptesCeleryConfig, PanoptesCeleryPluginScheduler
from yahoo_panoptes.framework.celery_manager import PanoptesCeleryConfig, PanoptesCeleryPluginScheduler, \
PanoptesScheduleEntry
from yahoo_panoptes.framework.resources import PanoptesContext
from yahoo_panoptes.framework.plugins.scheduler import PanoptesPluginScheduler
from yahoo_panoptes.framework.utilities.tour_of_duty import PanoptesTourOfDuty
Expand Down Expand Up @@ -128,3 +129,114 @@ def test_celery_beat_error(self):
with patch(u'yahoo_panoptes.framework.plugins.scheduler.PanoptesCeleryInstance', mock_celery_instance):
celery_app = self._scheduler.start()
self.assertIsNone(celery_app)


class TestPanoptesUniformScheduleEntry(unittest.TestCase):

@patch(u'redis.StrictRedis', panoptes_mock_redis_strict_client)
@patch(u'kazoo.client.KazooClient', panoptes_mock_kazoo_client)
def setUp(self):
self.my_dir, self.panoptes_test_conf_file = get_test_conf_file()
self._panoptes_context = PanoptesContext(self.panoptes_test_conf_file,
key_value_store_class_list=[PanoptesTestKeyValueStore],
create_message_producer=False, async_message_producer=False,
create_zookeeper_client=True)
self.mock_kv_store = self._panoptes_context.get_kv_store(PanoptesTestKeyValueStore)

@patch('time.time')
def test_not_scheduled_before(self, mock_time):
# Test `Task` Has Never Been Uniformly Scheduled

mock_time.return_value = 60

schedule_entry_not_due = PanoptesScheduleEntry(
name='Test Task', task='',
total_run_count=0, schedule=timedelta(seconds=60),
kv_store=self.mock_kv_store)

self.assertEqual(schedule_entry_not_due.run_at, 72.0)
schedstate = schedule_entry_not_due.is_due()

self.assertEqual(schedstate.is_due, False)
self.assertEqual(schedstate.next, 12.0)

# Task Is Due
mock_time.side_effect = [60, 72, 72]

schedule_entry_due = PanoptesScheduleEntry(
name='Test Task', task='',
total_run_count=0, schedule=timedelta(seconds=60),
kv_store=self.mock_kv_store)

self.assertEqual(schedule_entry_due.run_at, 72.0)
schedstate = schedule_entry_due.is_due()

self.assertEqual(schedstate.is_due, True)
self.assertEqual(schedstate.next, 60.0)
self.assertEqual(self.mock_kv_store.get('plugin_metadata:Test Task:last_uniformly_scheduled'), '72')

# Verify Next Updates The Correct Fields
schedule_entry_due = next(schedule_entry_due)
self.assertEqual(schedule_entry_due.total_run_count, 1)
self.assertEqual(schedule_entry_due.uniformly_scheduled, True)
self.assertIsNotNone(schedule_entry_due.last_run_at)

@patch('time.time')
def test_scheduled_before_and_overdue(self, mock_time):
"""
Verify that a new instance of the scheduler attempts to pick up where the previous
scheduler left off. In this case it is unable as too much time has passed since
the last uniformly scheduled key was set.
"""
mock_time.return_value = 100

schedule_entry = PanoptesScheduleEntry(
name='Test Task', task='',
total_run_count=0, schedule=timedelta(seconds=60),
kv_store=self.mock_kv_store,
last_uniformly_scheduled_at='30')

self.assertEqual(schedule_entry.run_at, 112.0)
schedstate = schedule_entry.is_due()

self.assertEqual(schedstate.is_due, False)
self.assertEqual(schedstate.next, 12.0)

@patch('time.time')
def test_scheduled_before_and_not_overdue(self, mock_time):
"""
Verify that a new instance of the scheduler can pick up where
the previous one left off.
"""
mock_time.return_value = 70

schedule_entry = PanoptesScheduleEntry(
name='Test Task', task='',
total_run_count=0, schedule=timedelta(seconds=60),
kv_store=self.mock_kv_store,
last_uniformly_scheduled_at='60')

self.assertEqual(schedule_entry.run_at, 120.0)
schedstate = schedule_entry.is_due()

self.assertEqual(schedstate.is_due, False)
self.assertEqual(schedstate.next, 50.0)

def test_schedule_entry_unique_identifier_does_not_throw(self):

self.assertEqual(
PanoptesScheduleEntry.schedule_entry_unique_identifier(timedelta(seconds=60), ('Test', 1, 1.0, b'bytes')),
"60.0|Test|1|1.0|b'bytes'"
)

def test_invalid_key_in_redis_is_handled(self):

schedule_entry = PanoptesScheduleEntry(
name='Test Task', task='',
total_run_count=0, schedule=timedelta(seconds=60),
kv_store=self.mock_kv_store,
last_uniformly_scheduled_at='60f')

schedstate = schedule_entry.is_due()
self.assertEqual(schedstate.is_due, False)
self.assertEqual(schedstate.next, 60)
7 changes: 4 additions & 3 deletions yahoo_panoptes/discovery/discovery_plugin_scheduler.py
Expand Up @@ -72,14 +72,15 @@ def __init__(self):
app_name=const.DISCOVERY_PLUGIN_SCHEDULER_CELERY_APP_NAME)


def discovery_plugin_scheduler_task(celery_beat_service):
def discovery_plugin_scheduler_task(celery_beat_service, iteration_count=0):
"""
This function is the workhorse of the Discovery Plugin Scheduler module. It detects changes in plugins and their
configuration and updates the Celery Beat schedule accordingly.
Args:
celery_beat_service (celery.beat.Service): The Celery Beat Service object associated with this Plugin Scheduler
iteration_count (int): The number of times the scheduler task has been called. The count is tracked by the
PanoptesTourOfDuty class inside of the PanoptesPluginScheduler class.
Returns:
None
"""
Expand Down Expand Up @@ -141,7 +142,7 @@ def discovery_plugin_scheduler_task(celery_beat_service):

try:
scheduler = celery_beat_service.scheduler
scheduler.update(logger, new_schedule)
scheduler.update(logger, new_schedule, called_by_panoptes=True)

end_time = time.time()
logger.info(u'Scheduled %d tasks in %.2fs' % (len(new_schedule), end_time - start_time))
Expand Down
7 changes: 4 additions & 3 deletions yahoo_panoptes/enrichment/enrichment_plugin_scheduler.py
Expand Up @@ -80,15 +80,16 @@ def __init__(self):
__init__(app_name=const.ENRICHMENT_PLUGIN_SCHEDULER_CELERY_APP_NAME)


def enrichment_plugin_scheduler_task(celery_beat_service):
def enrichment_plugin_scheduler_task(celery_beat_service, iteration_count=0):
"""
This function is the workhorse of the Enrichment Plugin Scheduler module. It detects changes in plugins and their
configuration and updates the Celery Beat schedule accordingly.
Args:
celery_beat_service (celery.beat.Service): The Celery Beat Service instance associated with this Plugin\
Scheduler
iteration_count (int): The number of times the scheduler task has been called. The count is tracked by the
PanoptesTourOfDuty class inside of the PanoptesPluginScheduler class.
Returns:
None
Expand Down Expand Up @@ -184,7 +185,7 @@ def enrichment_plugin_scheduler_task(celery_beat_service):

try:
scheduler = celery_beat_service.scheduler
scheduler.update(logger, new_schedule)
scheduler.update(logger, new_schedule, called_by_panoptes=True)
end_time = time.time()
logger.info(u'Scheduled %d tasks in %.2fs' % (len(new_schedule), end_time - start_time))

Expand Down

0 comments on commit 3ec851f

Please sign in to comment.