Skip to content

Commit

Permalink
Allow for a custom MetricsCollector class to be loaded at runtime (#3146
Browse files Browse the repository at this point in the history
)

* Change metrics.get to allow for custom imports

* Flake8 & Docs Fixes

Co-authored-by: Dillon Stadther <dlstadther+github@gmail.com>
  • Loading branch information
nipper and dlstadther committed Mar 12, 2022
1 parent 096fb98 commit d9450d4
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 3 deletions.
7 changes: 6 additions & 1 deletion doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,12 @@ metrics_collector
Optional setting allowing Luigi to use a contribution to collect metrics
about the pipeline to a third-party. By default this uses the default metric
collector that acts as a shell and does nothing. The currently available
options are "datadog" and "prometheus".
options are "datadog", "prometheus" and "custom". If it's custom the
'metrics_custom_import' needs to be set.

metrics_custom_import
Optional setting allowing Luigi to import a custom subclass of MetricsCollector
at runtime. The string should be formatted like "module.sub_module.ClassName".


[sendgrid]
Expand Down
20 changes: 19 additions & 1 deletion luigi/metrics.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import abc
import importlib

from enum import Enum


class MetricsCollectors(Enum):
custom = -1
default = 1
none = 1
datadog = 2
prometheus = 3

@classmethod
def get(cls, which):
def get(cls, which, custom_import=None):
if which == MetricsCollectors.none:
return NoMetricsCollector()
elif which == MetricsCollectors.datadog:
Expand All @@ -19,6 +21,22 @@ def get(cls, which):
elif which == MetricsCollectors.prometheus:
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector
return PrometheusMetricsCollector()
elif which == MetricsCollectors.custom:
if custom_import is None:
raise ValueError(f"MetricsCollectors value ' {which} ' is -1 and custom_import is None")

split_import_string = custom_import.split(".")

import_path = ".".join(split_import_string[:-1])
import_class_string = split_import_string[-1]

mod = importlib.import_module(import_path)
metrics_class = getattr(mod, import_class_string)

if issubclass(metrics_class, MetricsCollector):
return metrics_class()
else:
raise ValueError(f"Custom Import: {custom_import} is not a subclass of MetricsCollector")
else:
raise ValueError("MetricsCollectors value ' {0} ' isn't supported", which)

Expand Down
3 changes: 2 additions & 1 deletion luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class scheduler(Config):
send_messages = parameter.BoolParameter(default=True)

metrics_collector = parameter.EnumParameter(enum=MetricsCollectors, default=MetricsCollectors.default)
metrics_custom_import = parameter.OptionalStrParameter(default=None)

stable_done_cooldown_secs = parameter.IntParameter(default=10,
description="Sets cooldown period to avoid running the same task twice")
Expand Down Expand Up @@ -695,7 +696,7 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs
if self._config.batch_emails:
self._email_batcher = BatchNotifier()

self._state._metrics_collector = MetricsCollectors.get(self._config.metrics_collector)
self._state._metrics_collector = MetricsCollectors.get(self._config.metrics_collector, self._config.metrics_custom_import)

def load(self):
self._state.load()
Expand Down
9 changes: 9 additions & 0 deletions test/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,15 @@ def test_prometheus_metrics_collector(self):
collector = scheduler_state._metrics_collector
self.assertTrue(isinstance(collector, PrometheusMetricsCollector))

@with_config({'scheduler': {'metrics_collector': 'custom', 'metrics_custom_import': 'luigi.contrib.prometheus_metric.PrometheusMetricsCollector'}})
def test_custom_metrics_collector(self):
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector

s = luigi.scheduler.Scheduler()
scheduler_state = s._state
collector = scheduler_state._metrics_collector
self.assertTrue(isinstance(collector, PrometheusMetricsCollector))


class SchedulerWorkerTest(unittest.TestCase):
def get_pending_ids(self, worker, state):
Expand Down

0 comments on commit d9450d4

Please sign in to comment.