Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for a custom MetricsCollector class to be loaded at runtime #3146

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,12 @@ metrics_collector
Optional setting allowing Luigi to use a contribution to collect metrics
about the pipeline to a third-party. By default this uses the default metric
collector that acts as a shell and does nothing. The currently available
options are "datadog" and "prometheus".
options are "datadog", "prometheus" and "custom". If it's custom the
'metrics_custom_import' needs to be set.

metrics_custom_import
Optional setting allowing Luigi to import a custom subclass of MetricsCollector
at runtime. The string should be formatted like "module.sub_module.ClassName".


[sendgrid]
Expand Down
20 changes: 19 additions & 1 deletion luigi/metrics.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import abc
import importlib

from enum import Enum


class MetricsCollectors(Enum):
custom = -1
default = 1
none = 1
datadog = 2
prometheus = 3

@classmethod
def get(cls, which):
def get(cls, which, custom_import=None):
if which == MetricsCollectors.none:
return NoMetricsCollector()
elif which == MetricsCollectors.datadog:
Expand All @@ -19,6 +21,22 @@ def get(cls, which):
elif which == MetricsCollectors.prometheus:
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector
return PrometheusMetricsCollector()
elif which == MetricsCollectors.custom:
if custom_import is None:
raise ValueError(f"MetricsCollectors value ' {which} ' is -1 and custom_import is None")

split_import_string = custom_import.split(".")

import_path = ".".join(split_import_string[:-1])
import_class_string = split_import_string[-1]

mod = importlib.import_module(import_path)
metrics_class = getattr(mod, import_class_string)

if issubclass(metrics_class, MetricsCollector):
return metrics_class()
else:
raise ValueError(f"Custom Import: {custom_import} is not a subclass of MetricsCollector")
else:
raise ValueError("MetricsCollectors value ' {0} ' isn't supported", which)

Expand Down
3 changes: 2 additions & 1 deletion luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class scheduler(Config):
send_messages = parameter.BoolParameter(default=True)

metrics_collector = parameter.EnumParameter(enum=MetricsCollectors, default=MetricsCollectors.default)
metrics_custom_import = parameter.OptionalStrParameter(default=None)

stable_done_cooldown_secs = parameter.IntParameter(default=10,
description="Sets cooldown period to avoid running the same task twice")
Expand Down Expand Up @@ -695,7 +696,7 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs
if self._config.batch_emails:
self._email_batcher = BatchNotifier()

self._state._metrics_collector = MetricsCollectors.get(self._config.metrics_collector)
self._state._metrics_collector = MetricsCollectors.get(self._config.metrics_collector, self._config.metrics_custom_import)

def load(self):
self._state.load()
Expand Down
9 changes: 9 additions & 0 deletions test/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,15 @@ def test_prometheus_metrics_collector(self):
collector = scheduler_state._metrics_collector
self.assertTrue(isinstance(collector, PrometheusMetricsCollector))

@with_config({'scheduler': {'metrics_collector': 'custom', 'metrics_custom_import': 'luigi.contrib.prometheus_metric.PrometheusMetricsCollector'}})
def test_custom_metrics_collector(self):
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector

s = luigi.scheduler.Scheduler()
scheduler_state = s._state
collector = scheduler_state._metrics_collector
self.assertTrue(isinstance(collector, PrometheusMetricsCollector))


class SchedulerWorkerTest(unittest.TestCase):
def get_pending_ids(self, worker, state):
Expand Down