/
enrichment_plugin_scheduler.py
260 lines (202 loc) · 10.6 KB
/
enrichment_plugin_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
"""
Copyright 2018, Oath Inc.
Licensed under the terms of the Apache 2.0 license. See LICENSE file in project root for terms.
The module implements the Enrichment Plugin Scheduler which finds any enrichment Plugins in the configured path and then
installs/updates a Celery Beat schedule for each enrichment plugin and configuration combination.
If the list or the configuration of the plugins changes, the schedule is updated
This module is expected to be imported and executed though the Celery 'beat' command line tool
Internally, there are two threads that get setup: one is the main thread that runs the Celery Beat service. The other is
the thread started by the Enrichment Plugin Scheduler to detect and update plugin/configuration changes
"""
import faulthandler
import sys
import time
from resource import getrusage, RUSAGE_SELF
from datetime import datetime, timedelta
from celery.signals import beat_init
from yahoo_panoptes.framework import const
from yahoo_panoptes.framework.exceptions import PanoptesBaseException
from yahoo_panoptes.framework.context import PanoptesContext
from yahoo_panoptes.framework.resources import PanoptesResourcesKeyValueStore, PanoptesResourceCache
from yahoo_panoptes.framework.celery_manager import PanoptesCeleryConfig
from yahoo_panoptes.framework.plugins.helpers import expires, time_limit
from yahoo_panoptes.framework.plugins.manager import PanoptesPluginManager
from yahoo_panoptes.framework.plugins.panoptes_base_plugin import PanoptesPluginConfigurationError
from yahoo_panoptes.framework.plugins.scheduler import PanoptesPluginScheduler
from yahoo_panoptes.framework.utilities.helpers import get_calling_module_name, inspect_calling_module_for_name
from yahoo_panoptes.framework.utilities.key_value_store import PanoptesKeyValueStore
from yahoo_panoptes.enrichment.enrichment_plugin import PanoptesEnrichmentPlugin, PanoptesEnrichmentPluginInfo
from yahoo_panoptes.enrichment.enrichment_plugin_agent import PanoptesEnrichmentPluginAgentKeyValueStore
panoptes_context = None
enrichment_plugin_scheduler = None
celery = None
logger = None
class PanoptesEnrichmentPluginSchedulerError(PanoptesBaseException):
"""
The exception class for Enrichment Plugin Scheduler errors
"""
pass
class PanoptesEnrichmentSchedulerKeyValueStore(PanoptesKeyValueStore):
"""
A custom Key/Value store for which uses the namespace demarcated for Enrichment Plugin Scheduler
"""
def __init__(self, panoptes_context):
super(PanoptesEnrichmentSchedulerKeyValueStore, self). \
__init__(panoptes_context, const.ENRICHMENT_PLUGIN_SCHEDULER_KEY_VALUE_NAMESPACE)
class PanoptesEnrichmentPluginSchedulerContext(PanoptesContext):
"""
This class implements a PanoptesContext with the following:
A Key/Value Store for Enrichment Plugin Scheduler and Resource Manager
A Zookeeper Client
"""
def __init__(self):
super(PanoptesEnrichmentPluginSchedulerContext, self).__init__(
key_value_store_class_list=[PanoptesEnrichmentSchedulerKeyValueStore,
PanoptesEnrichmentPluginAgentKeyValueStore,
PanoptesResourcesKeyValueStore],
create_message_producer=False, create_zookeeper_client=True)
class PanoptesCeleryEnrichmentAgentConfig(PanoptesCeleryConfig):
task_routes = {const.ENRICHMENT_PLUGIN_AGENT_MODULE_NAME: {u'queue': const.ENRICHMENT_PLUGIN_AGENT_CELERY_APP_NAME}}
def __init__(self):
super(PanoptesCeleryEnrichmentAgentConfig, self). \
__init__(app_name=const.ENRICHMENT_PLUGIN_SCHEDULER_CELERY_APP_NAME)
def enrichment_plugin_scheduler_task(celery_beat_service, iteration_count=0):
"""
This function is the workhorse of the Enrichment Plugin Scheduler module. It detects changes in plugins and their
configuration and updates the Celery Beat schedule accordingly.
Args:
celery_beat_service (celery.beat.Service): The Celery Beat Service instance associated with this Plugin\
Scheduler
iteration_count (int): The number of times the scheduler task has been called. The count is tracked by the
PanoptesTourOfDuty class inside of the PanoptesPluginScheduler class.
Returns:
None
"""
logger.info(u'Timezone: %s' % str(celery_beat_service.app.timezone))
start_time = time.time()
try:
resource_cache = PanoptesResourceCache(panoptes_context)
resource_cache.setup_resource_cache()
except Exception:
logger.exception(u'Could not create resource cache, skipping cycle')
return
try:
plugin_manager = PanoptesPluginManager(
plugin_type=u'enrichment',
plugin_class=PanoptesEnrichmentPlugin,
plugin_info_class=PanoptesEnrichmentPluginInfo,
panoptes_context=panoptes_context,
kv_store_class=PanoptesEnrichmentPluginAgentKeyValueStore
)
plugins = plugin_manager.getPluginsOfCategory(category_name=u'enrichment')
logger.info(u'Found %d plugins' % len(plugins))
except:
logger.exception('Error trying to load enrichment plugins, skipping cycle')
return
new_schedule = dict()
for plugin in plugins:
logger.info(u'Found plugin "%s", version %s at %s ' % (plugin.name, plugin.version, plugin.path))
try:
logger.info(u'Plugin "%s" has configuration: %s' % (plugin.name, plugin.config))
logger.info(u'Plugin %s has plugin module time %s (UTC) and config mtime %s (UTC)' % (
plugin.name, plugin.moduleMtime, plugin.configMtime))
if plugin.execute_frequency <= 0:
logger.info(u'Plugin %s has an invalid execution frequency (%d), skipping plugin' % (
plugin.name, plugin.execute_frequency))
continue
if not plugin.resource_filter:
logger.info(u'Plugin "%s" does not have any resource filter specified, skipping plugin' %
plugin.name)
continue
except PanoptesPluginConfigurationError as e:
logger.error(u'Error reading/parsing configuration for plugin "%s", skipping plugin. Error: %s' %
(plugin.name, repr(e)))
try:
resource_set = resource_cache.get_resources(plugin.resource_filter)
except Exception as e:
logger.info(u'Error in applying resource filter "%s" for plugin "%s", skipping plugin: %s' % (
plugin.resource_filter, plugin.name, repr(e)))
continue
if len(resource_set) == 0:
logger.info(
u'No resources found for plugin "%s" after applying resource filter "%s", skipping plugin' % (
plugin.name, plugin.resource_filter))
logger.info(u'Length of resource set {} for plugin {}'.format(len(resource_set), plugin.name))
for resource in resource_set:
logger.debug(u'Going to add task for plugin "%s" with execute frequency %d, args "%s", resources %s' % (
plugin.name, plugin.execute_frequency, plugin.config, resource))
plugin.data = resource
task_name = u':'.join([plugin.normalized_name, plugin.signature, str(resource.resource_id)])
new_schedule[task_name] = {
u'task': const.ENRICHMENT_PLUGIN_AGENT_MODULE_NAME,
u'schedule': timedelta(seconds=plugin.execute_frequency),
u'last_run_at': datetime.utcfromtimestamp(plugin.last_executed),
u'args': (plugin.name, resource.serialization_key),
u'options': {
u'expires': expires(plugin),
u'time_limit': time_limit(plugin)
}
}
resource_cache.close_resource_cache()
logger.info(u'Going to unload plugin modules. Length of sys.modules before unloading modules: %d'
% len(sys.modules))
plugin_manager.unload_modules()
logger.info(u'Unloaded plugin modules. Length of sys.modules after unloading modules: %d' % len(sys.modules))
try:
scheduler = celery_beat_service.scheduler
scheduler.update(logger, new_schedule, called_by_panoptes=True)
end_time = time.time()
logger.info(u'Scheduled %d tasks in %.2fs' % (len(new_schedule), end_time - start_time))
except:
logger.exception(u'Error in updating schedule for Enrichment Plugins')
logger.info(u'RSS memory: %dKB' % getrusage(RUSAGE_SELF).ru_maxrss)
def start_enrichment_plugin_scheduler():
"""
The entry point for the Enrichment Plugin Scheduler
This method creates a Panoptes Context and the Celery Instance for the Enrichment Plugin Scheduler
Returns:
None
"""
global enrichment_plugin_scheduler, celery, logger, panoptes_context, resources_store
try:
panoptes_context = PanoptesEnrichmentPluginSchedulerContext()
except Exception as e:
sys.exit(u'Could not create a Panoptes Context: %s' % (repr(e)))
try:
celery_config = PanoptesCeleryEnrichmentAgentConfig()
except Exception as e:
sys.exit(u'Could not create a Celery Config object: %s' % repr(e))
enrichment_plugin_scheduler = PanoptesPluginScheduler(
panoptes_context=panoptes_context,
plugin_type=u'enrichment',
plugin_type_display_name=u'Enrichment',
celery_config=celery_config,
lock_timeout=const.ENRICHMENT_PLUGIN_SCHEDULER_LOCK_ACQUIRE_TIMEOUT,
plugin_scheduler_task=enrichment_plugin_scheduler_task)
logger = panoptes_context.logger
celery = enrichment_plugin_scheduler.start()
if not celery:
sys.exit(u'Could not start Celery Beat Service')
@beat_init.connect
def celery_beat_service_started(sender=None, args=None, **kwargs):
"""
This method is called after Celery Beat instantiates it's service
Args:
sender (celery.beat.Service): The Celery Beat Service which was started by Celery Beat
args (dict): Arguments
**kwargs (dict): Keyword Arguments
Returns:
None
"""
global enrichment_plugin_scheduler
sender.scheduler.panoptes_context = panoptes_context
sender.scheduler.task_prefix = const.ENRICHMENT_PLUGIN_SCHEDULER_CELERY_TASK_PREFIX
enrichment_plugin_scheduler.run(sender, args, **kwargs)
"""
This wrapper is to ensure that the Enrichment Plugin Scheduler only executes when called from Celery - prevents against
execution when imported from other modules (like Sphinx) or called from the command line
"""
if get_calling_module_name() == const.CELERY_LOADER_MODULE or \
inspect_calling_module_for_name('celery'): # pragma: no cover
faulthandler.enable()
start_enrichment_plugin_scheduler()