Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Datadog contrib for monitoring purpose #2434

Merged
merged 14 commits into from Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions doc/configuration.rst
Expand Up @@ -815,6 +815,12 @@ send_messages
the central scheduler provides a simple prompt per task to send messages.
Defaults to true.

metrics_collector
Optional setting allowing Luigi to use a contribution to collect metrics
about the pipeline to a third-party. By default this uses the default metric
collector that acts as a shell and does nothing. The only currently available
option is "datadog".


[sendgrid]
----------
Expand Down Expand Up @@ -988,6 +994,28 @@ client_type
authentication. The other option is the "kerberos" client that uses kerberos
authentication.

[datadog]
---------

api_key
The api key found in the account settings of Datadog under the API
sections.
app_key
The application key found in the account settings of Datadog under the API
sections.
default_tags
Optional settings that adds the tag to all the metrics and events sent to
Datadog. Default value is "application:luigi".
environment
Allows you to tweak multiple environment to differentiate between production,
staging or development metrics within Datadog. Default value is "development".
statsd_host
The host that has the statsd instance to allow Datadog to send statsd metric. Default value is "localhost".
statsd_port
The port on the host that allows connection to the statsd host. Defaults value is 8125.
metric_namespace
Optional prefix to add to the beginning of every metric sent to Datadog.
Default value is "luigi".

Per Task Retry-Policy
---------------------
Expand Down
127 changes: 127 additions & 0 deletions luigi/contrib/datadog_metric.py
@@ -0,0 +1,127 @@
import logging

from luigi import parameter
from luigi.metrics import MetricsCollector
from luigi.task import Config

logger = logging.getLogger('luigi-interface')

try:
from datadog import initialize, api, statsd
except ImportError:
logger.warning("Loading datadog module without datadog installed. Will crash at runtime if datadog functionality is used.")


class datadog(Config):
api_key = parameter.Parameter(default='dummy_api_key', description='API key provided by Datadog')
app_key = parameter.Parameter(default='dummy_app_key', description='APP key provided by Datadog')
default_tags = parameter.Parameter(default='application:luigi', description='Default tags for every events and metrics sent to Datadog')
environment = parameter.Parameter(default='development', description="Environment of which the pipeline is ran from (eg: 'production', 'staging', ...")
metric_namespace = parameter.Parameter(default='luigi', description="Default namespace for events and metrics (eg: 'luigi' for 'luigi.task.started')")
statsd_host = parameter.Parameter(default='localhost', description='StatsD host implementing the Datadog service')
statsd_port = parameter.IntParameter(default=8125, description='StatsD port implementing the Datadog service')


class DatadogMetricsCollector(MetricsCollector):
def __init__(self, *args, **kwargs):
self._config = datadog(**kwargs)

initialize(api_key=self._config.api_key,
app_key=self._config.app_key,
statsd_host=self._config.statsd_host,
statsd_port=self._config.statsd_port)

def handle_task_started(self, task):
title = "Luigi: A task has been started!"
text = "A task has been started in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self._send_increment('task.started', tags=tags)

event_tags = tags + ["task_state:STARTED"]
self._send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low')

def handle_task_failed(self, task):
title = "Luigi: A task has failed!"
text = "A task has failed in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self._send_increment('task.failed', tags=tags)

event_tags = tags + ["task_state:FAILED"]
self._send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal')

def handle_task_disabled(self, task, config):
title = "Luigi: A task has been disabled!"
lines = ['A task has been disabled in the pipeline named: {name}.']
lines.append('The task has failed {failures} times in the last {window}')
lines.append('seconds, so it is being disabled for {persist} seconds.')

preformated_text = ' '.join(lines)

text = preformated_text.format(name=task.family,
persist=config.disable_persist,
failures=config.retry_count,
window=config.disable_window)

tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self._send_increment('task.disabled', tags=tags)

event_tags = tags + ["task_state:DISABLED"]
self._send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal')

def handle_task_done(self, task):
# The task is already done -- Let's not re-create an event
if task.time_running is None:
return

title = "Luigi: A task has been completed!"
text = "A task has completed in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

time_elapse = task.updated - task.time_running

self._send_increment('task.done', tags=tags)
self._send_gauge('task.execution_time', time_elapse, tags=tags)

event_tags = tags + ["task_state:DONE"]
self._send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low')

def _send_event(self, **params):
params['tags'] += self.default_tags

api.Event.create(**params)

def _send_gauge(self, metric_name, value, tags=[]):
all_tags = tags + self.default_tags

namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace,
metric_name=metric_name)
statsd.gauge(namespaced_metric, value, tags=all_tags)

def _send_increment(self, metric_name, value=1, tags=[]):
all_tags = tags + self.default_tags

namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace,
metric_name=metric_name)
statsd.increment(namespaced_metric, value, tags=all_tags)

def _format_task_params_to_tags(self, task):
params = []
for key, value in task.params.items():
params.append("{key}:{value}".format(key=key, value=value))

return params

@property
def default_tags(self):
thisiscab marked this conversation as resolved.
Show resolved Hide resolved
default_tags = []

env_tag = "environment:{environment}".format(environment=self._config.environment)
default_tags.append(env_tag)

if self._config.default_tags:
default_tags = default_tags + str.split(self._config.default_tags, ',')

return default_tags
67 changes: 67 additions & 0 deletions luigi/metrics.py
@@ -0,0 +1,67 @@
import abc

from enum import Enum
thisiscab marked this conversation as resolved.
Show resolved Hide resolved
from luigi import six


class MetricsCollectors(Enum):
default = 1
none = 1
datadog = 2

@classmethod
def get(cls, which):
if which == MetricsCollectors.none:
return NoMetricsCollector()
elif which == MetricsCollectors.datadog:
from luigi.contrib.datadog_metric import DatadogMetricsCollector
return DatadogMetricsCollector()
else:
raise ValueError("MetricsCollectors value ' {0} ' isn't supported", which)


@six.add_metaclass(abc.ABCMeta)
class MetricsCollector(object):
thisiscab marked this conversation as resolved.
Show resolved Hide resolved
"""Abstractable MetricsCollector base class that can be replace by tool
specific implementation.
"""

@abc.abstractmethod
def __init__(self):
pass

@abc.abstractmethod
def handle_task_started(self, task):
pass

@abc.abstractmethod
def handle_task_failed(self, task):
pass

@abc.abstractmethod
def handle_task_disabled(self, task, config):
pass

@abc.abstractmethod
def handle_task_done(self, task):
pass


class NoMetricsCollector(MetricsCollector):
"""Empty MetricsCollector when no collector is being used
"""

def __init__(self):
pass

def handle_task_started(self, task):
pass

def handle_task_failed(self, task):
pass

def handle_task_disabled(self, task, config):
pass

def handle_task_done(self, task):
pass
29 changes: 29 additions & 0 deletions luigi/scheduler.py
Expand Up @@ -50,6 +50,8 @@
from luigi.task import Config
from luigi.parameter import ParameterVisibility

from luigi.metrics import MetricsCollectors

logger = logging.getLogger(__name__)

UPSTREAM_RUNNING = 'UPSTREAM_RUNNING'
Expand Down Expand Up @@ -146,6 +148,8 @@ class scheduler(Config):

send_messages = parameter.BoolParameter(default=True)

metrics_collector = parameter.EnumParameter(enum=MetricsCollectors, default=MetricsCollectors.default)

def _get_retry_policy(self):
return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window)

Expand Down Expand Up @@ -444,6 +448,7 @@ def __init__(self, state_path):
self._status_tasks = collections.defaultdict(dict)
self._active_workers = {} # map from id to a Worker object
self._task_batchers = {}
self._metrics_collector = None

def get_state(self):
return self._tasks, self._active_workers, self._task_batchers
Expand Down Expand Up @@ -562,9 +567,11 @@ def set_status(self, task, new_status, config=None):

if new_status == FAILED and task.status != DISABLED:
task.add_failure()
self.update_metrics_task_failed(task)
if task.has_excessive_failures():
task.scheduler_disable_time = time.time()
new_status = DISABLED
self.update_metrics_task_disabled(task, config)
if not config.batch_emails:
notifications.send_error_email(
'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id),
Expand All @@ -584,6 +591,9 @@ def set_status(self, task, new_status, config=None):
task.status = new_status
task.updated = time.time()

if new_status == DONE:
self.update_metrics_task_done(task)

if new_status == FAILED:
task.retry = time.time() + config.retry_delay
if remove_on_failure:
Expand Down Expand Up @@ -666,6 +676,18 @@ def disable_workers(self, worker_ids):
worker.disabled = True
worker.tasks.clear()

def update_metrics_task_started(self, task):
self._metrics_collector.handle_task_started(task)

def update_metrics_task_disabled(self, task, config):
self._metrics_collector.handle_task_disabled(task, config)

def update_metrics_task_failed(self, task):
self._metrics_collector.handle_task_failed(task)

def update_metrics_task_done(self, task):
self._metrics_collector.handle_task_done(task)


class Scheduler(object):
"""
Expand Down Expand Up @@ -699,6 +721,8 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs
if self._config.batch_emails:
self._email_batcher = BatchNotifier()

self._state._metrics_collector = MetricsCollectors.get(self._config.metrics_collector)

def load(self):
self._state.load()

Expand Down Expand Up @@ -1226,6 +1250,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
reply['batch_task_ids'] = [task.id for task in batched_tasks]

elif best_task:
self.update_metrics_task_started(best_task)
self._state.set_status(best_task, RUNNING, self._config)
best_task.worker_running = worker_id
best_task.resources_running = best_task.resources.copy()
Expand Down Expand Up @@ -1619,3 +1644,7 @@ def _update_task_history(self, task, status, host=None):
def task_history(self):
# Used by server.py to expose the calls
return self._task_history

@rpc_method()
def update_metrics_task_started(self, task):
self._state._metrics_collector.handle_task_started(task)