diff --git a/test/contrib/datadog_metric_test.py b/test/contrib/datadog_metric_test.py index f68a67d514..bb7e75a1fb 100644 --- a/test/contrib/datadog_metric_test.py +++ b/test/contrib/datadog_metric_test.py @@ -1,11 +1,9 @@ # -*- coding: utf-8 -*- -from helpers import unittest, with_config +from helpers import unittest import mock import time -import luigi.notifications - from luigi.contrib.datadog_metric import DatadogMetricsCollector from luigi.metrics import MetricsCollectors from luigi.scheduler import Scheduler @@ -15,23 +13,29 @@ class DatadogMetricTest(unittest.TestCase): def setUp(self): - self.time = time.time self.mockDatadog() + self.time = time.time self.collector = DatadogMetricsCollector() - self.s = Scheduler(metrics_collector=MetricsCollectors.datadog) - self.s.add_task(worker=WORKER, task_id='DDTaskID', family='DDTaskName') - self.task = self.s._state.get_task('DDTaskID') - self.task.time_running = 0 def tearDown(self): - self.create_patcher.stop() - self.increment_patcher.stop() - self.gauge_patcher.stop() + self.unMockDatadog() if time.time != self.time: time.time = self.time + def startTask(self, scheduler=None): + if scheduler: + s = scheduler + else: + s = self.s + + s.add_task(worker=WORKER, task_id='DDTaskID', family='DDTaskName') + task = s._state.get_task('DDTaskID') + + task.time_running = 0 + return task + def mockDatadog(self): self.create_patcher = mock.patch('datadog.api.Event.create') self.mock_create = self.create_patcher.start() @@ -42,106 +46,106 @@ def mockDatadog(self): self.gauge_patcher = mock.patch('datadog.statsd.gauge') self.mock_gauge = self.gauge_patcher.start() + def unMockDatadog(self): + self.create_patcher.stop() + self.increment_patcher.stop() + self.gauge_patcher.stop() + def setTime(self, t): time.time = lambda: t - @mock.patch('datadog.api.Event.create') - def test_send_event_on_task_started(self, create_dd_event): - self.collector.handle_task_started(self.task) + def test_send_event_on_task_started(self): + task = self.startTask() + self.collector.handle_task_started(task) + + self.mock_create.assert_called_once_with(alert_type='info', + priority='low', + tags=['task_name:DDTaskName', + 'task_state:STARTED', + 'environment:development', + 'application:luigi'], + text='A task has been started in the pipeline named: DDTaskName', + title='Luigi: A task has been started!') - create_dd_event.assert_called_once_with(alert_type='info', - priority='low', - tags=['task_name:DDTaskName', - 'task_state:STARTED', - 'environment:development', - 'application:luigi'], - text='A task has been started in the pipeline named: DDTaskName', - title='Luigi: A task has been started!') + def test_send_increment_on_task_started(self): + task = self.startTask() + self.collector.handle_task_started(task) - @mock.patch('datadog.statsd.increment') - def test_send_increment_on_task_started(self, increment_dd_counter): - self.collector.handle_task_started(self.task) + self.mock_increment.assert_called_once_with('luigi.task.started', 1, tags=['task_name:DDTaskName', + 'environment:development', + 'application:luigi']) - increment_dd_counter.assert_called_once_with('luigi.task.started', 1, tags=['task_name:DDTaskName', + def test_send_event_on_task_failed(self): + task = self.startTask() + self.collector.handle_task_failed(task) + + self.mock_create.assert_called_once_with(alert_type='error', + priority='normal', + tags=['task_name:DDTaskName', + 'task_state:FAILED', + 'environment:development', + 'application:luigi'], + text='A task has failed in the pipeline named: DDTaskName', + title='Luigi: A task has failed!') + + def test_send_increment_on_task_failed(self): + task = self.startTask() + self.collector.handle_task_failed(task) + + self.mock_increment.assert_called_once_with('luigi.task.failed', 1, tags=['task_name:DDTaskName', + 'environment:development', + 'application:luigi']) + + def test_send_event_on_task_disabled(self): + s = Scheduler(metrics_collector=MetricsCollectors.datadog, disable_persist=10, retry_count=2, disable_window=2) + task = self.startTask(scheduler=s) + self.collector.handle_task_disabled(task, s._config) + + self.mock_create.assert_called_once_with(alert_type='error', + priority='normal', + tags=['task_name:DDTaskName', + 'task_state:DISABLED', + 'environment:development', + 'application:luigi'], + text='A task has been disabled in the pipeline named: DDTaskName. ' + + 'The task has failed 2 times in the last 2 seconds' + + ', so it is being disabled for 10 seconds.', + title='Luigi: A task has been disabled!') + + def test_send_increment_on_task_disabled(self): + task = self.startTask() + self.collector.handle_task_disabled(task, self.s._config) + + self.mock_increment.assert_called_once_with('luigi.task.disabled', 1, tags=['task_name:DDTaskName', 'environment:development', 'application:luigi']) - @mock.patch('datadog.api.Event.create') - def test_send_event_on_task_failed(self, create_dd_event): - self.collector.handle_task_failed(self.task) + def test_send_event_on_task_done(self): + task = self.startTask() + self.collector.handle_task_done(task) - create_dd_event.assert_called_once_with(alert_type='error', - priority='normal', - tags=['task_name:DDTaskName', - 'task_state:FAILED', - 'environment:development', - 'application:luigi'], - text='A task has failed in the pipeline named: DDTaskName', - title='Luigi: A task has failed!') + self.mock_create.assert_called_once_with(alert_type='info', + priority='low', + tags=['task_name:DDTaskName', + 'task_state:DONE', + 'environment:development', + 'application:luigi'], + text='A task has completed in the pipeline named: DDTaskName', + title='Luigi: A task has been completed!') - @mock.patch('datadog.statsd.increment') - def test_send_increment_on_task_failed(self, increment_dd_counter): - self.collector.handle_task_failed(self.task) + def test_send_increment_on_task_done(self): + task = self.startTask() + self.collector.handle_task_done(task) - increment_dd_counter.assert_called_once_with('luigi.task.failed', 1, tags=['task_name:DDTaskName', - 'environment:development', - 'application:luigi']) + self.mock_increment.assert_called_once_with('luigi.task.done', 1, tags=['task_name:DDTaskName', + 'environment:development', + 'application:luigi']) - @mock.patch('datadog.api.Event.create') - @with_config({'scheduler': {'metrics_collector': 'datadog', 'disable_persist': '10', 'retry_count': '2', 'disable_window': '2'}}) - def test_send_event_on_task_disabled(self, create_dd_event): - sch = luigi.scheduler.Scheduler() - self.collector.handle_task_disabled(self.task, sch._config) - - create_dd_event.assert_called_once_with(alert_type='error', - priority='normal', - tags=['task_name:DDTaskName', - 'task_state:DISABLED', - 'environment:development', - 'application:luigi'], - text='A task has been disabled in the pipeline named: DDTaskName. ' + - 'The task has failed 2 times in the last 2 seconds' + - ', so it is being disabled for 10 seconds.', - title='Luigi: A task has been disabled!') - - @mock.patch('datadog.statsd.increment') - def test_send_increment_on_task_disabled(self, increment_dd_counter): - self.collector.handle_task_disabled(self.task, self.s._config) - - increment_dd_counter.assert_called_once_with('luigi.task.disabled', 1, tags=['task_name:DDTaskName', - 'environment:development', - 'application:luigi']) - - @mock.patch('datadog.api.Event.create') - def test_send_event_on_task_done(self, create_dd_event): - self.collector.handle_task_done(self.task) - - create_dd_event.assert_called_once_with(alert_type='info', - priority='low', - tags=['task_name:DDTaskName', - 'task_state:DONE', - 'environment:development', - 'application:luigi'], - text='A task has completed in the pipeline named: DDTaskName', - title='Luigi: A task has been completed!') - - @mock.patch('datadog.statsd.increment') - def test_send_increment_on_task_done(self, increment_dd_counter): - self.collector.handle_task_done(self.task) - - increment_dd_counter.assert_called_once_with('luigi.task.done', 1, tags=['task_name:DDTaskName', - 'environment:development', - 'application:luigi']) - - @mock.patch('datadog.statsd.gauge') - def test_send_gauge_on_task_done(self, gauge_dd): + def test_send_gauge_on_task_done(self): self.setTime(0) - self.s.add_task(worker=WORKER, task_id='DDTaskIDFrozen', family='DDTaskNameFrozen') - frozen_task = self.s._state.get_task('DDTaskIDFrozen') - frozen_task.time_running = 0 - - self.collector.handle_task_done(frozen_task) + task = self.startTask() + self.collector.handle_task_done(task) - gauge_dd.assert_called_once_with('luigi.task.execution_time', 0, tags=['task_name:DDTaskNameFrozen', - 'environment:development', - 'application:luigi']) + self.mock_gauge.assert_called_once_with('luigi.task.execution_time', 0, tags=['task_name:DDTaskName', + 'environment:development', + 'application:luigi'])