Skip to content

Commit

Permalink
Fix spec issues related to new tests
Browse files Browse the repository at this point in the history
There was multiple problems that needed to be solved in order to get the
specs green again. Each individual specs were passing when ran
individually, but when ran into tox as a group, some of them would pass
and other would fail depending the tox environment.

It came to my attention that the time function of this file, was
creating an issue with other specs because we were not tearDowning it as
expected. Also, using setTime within the setUp group had side effects
with unexpected behaviors.

Then, the way way that the task_id and task_family was named was also
causing problems with the same spec that were failing prior.  I'm unsure
why this would be the case, but changing either fail, but changing both
makes the spec to green.

Finally, the last spec would always fail because the setTime was set
AFTER the task was actually being run, which would always cause the
execution time to be greater than 0.

My understanding of all of this is still a bit fuzzy, but hey, now the
spec suite passes.
  • Loading branch information
thisiscab committed Dec 13, 2018
1 parent a67f2af commit e8fafd5
Showing 1 changed file with 30 additions and 23 deletions.
53 changes: 30 additions & 23 deletions test/contrib/datadog_metric_test.py
Expand Up @@ -7,29 +7,31 @@
import luigi.notifications

from luigi.contrib.datadog_metric import DatadogMetricsCollector
from luigi.metrics import MetricsCollectors
from luigi.scheduler import Scheduler

luigi.notifications.DEBUG = True
WORKER = 'myworker'


class DatadogNotificationTest(unittest.TestCase):
@with_config({'scheduler': {'metrics_collector': 'datadog'}})
class DatadogMetricTest(unittest.TestCase):
def setUp(self):
self.time = time.time
self.mockDatadog()
self.collector = DatadogMetricsCollector()

self.setTime(0)
self.s = Scheduler()
self.s.add_task(worker=WORKER, task_id='A', family='MyTask')
self.task = self.s._state.get_task('A')
self.s = Scheduler(metrics_collector=MetricsCollectors.datadog)
self.s.add_task(worker=WORKER, task_id='DDTaskID', family='DDTaskName')
self.task = self.s._state.get_task('DDTaskID')
self.task.time_running = 0

def tearDown(self):
self.create_patcher.stop()
self.increment_patcher.stop()
self.gauge_patcher.stop()

if time.time != self.time:
time.time = self.time

def mockDatadog(self):
self.create_patcher = mock.patch('datadog.api.Event.create')
self.mock_create = self.create_patcher.start()
Expand All @@ -49,18 +51,18 @@ def test_send_event_on_task_started(self, create_dd_event):

create_dd_event.assert_called_once_with(alert_type='info',
priority='low',
tags=['task_name:MyTask',
tags=['task_name:DDTaskName',
'task_state:STARTED',
'environment:development',
'application:luigi'],
text='A task has been started in the pipeline named: MyTask',
text='A task has been started in the pipeline named: DDTaskName',
title='Luigi: A task has been started!')

@mock.patch('datadog.statsd.increment')
def test_send_increment_on_task_started(self, increment_dd_counter):
self.collector.handle_task_started(self.task)

increment_dd_counter.assert_called_once_with('luigi.task.started', 1, tags=['task_name:MyTask',
increment_dd_counter.assert_called_once_with('luigi.task.started', 1, tags=['task_name:DDTaskName',
'environment:development',
'application:luigi'])

Expand All @@ -70,43 +72,43 @@ def test_send_event_on_task_failed(self, create_dd_event):

create_dd_event.assert_called_once_with(alert_type='error',
priority='normal',
tags=['task_name:MyTask',
tags=['task_name:DDTaskName',
'task_state:FAILED',
'environment:development',
'application:luigi'],
text='A task has failed in the pipeline named: MyTask',
text='A task has failed in the pipeline named: DDTaskName',
title='Luigi: A task has failed!')

@mock.patch('datadog.statsd.increment')
def test_send_increment_on_task_failed(self, increment_dd_counter):
self.collector.handle_task_failed(self.task)

increment_dd_counter.assert_called_once_with('luigi.task.failed', 1, tags=['task_name:MyTask',
increment_dd_counter.assert_called_once_with('luigi.task.failed', 1, tags=['task_name:DDTaskName',
'environment:development',
'application:luigi'])

@mock.patch('datadog.api.Event.create')
@with_config({'scheduler': {'metrics_collector': 'datadog', 'disable_persist': '10', 'retry_count': '5', 'disable-window-seconds': '2'}})
@with_config({'scheduler': {'metrics_collector': 'datadog', 'disable_persist': '10', 'retry_count': '2', 'disable_window': '2'}})
def test_send_event_on_task_disabled(self, create_dd_event):
sch = luigi.scheduler.Scheduler()
self.collector.handle_task_disabled(self.task, sch._config)

create_dd_event.assert_called_once_with(alert_type='error',
priority='normal',
tags=['task_name:MyTask',
tags=['task_name:DDTaskName',
'task_state:DISABLED',
'environment:development',
'application:luigi'],
text='A task has been disabled in the pipeline named: MyTask. ' +
'The task has failed 5 times in the last 2 seconds' +
text='A task has been disabled in the pipeline named: DDTaskName. ' +
'The task has failed 2 times in the last 2 seconds' +
', so it is being disabled for 10 seconds.',
title='Luigi: A task has been disabled!')

@mock.patch('datadog.statsd.increment')
def test_send_increment_on_task_disabled(self, increment_dd_counter):
self.collector.handle_task_disabled(self.task, self.s._config)

increment_dd_counter.assert_called_once_with('luigi.task.disabled', 1, tags=['task_name:MyTask',
increment_dd_counter.assert_called_once_with('luigi.task.disabled', 1, tags=['task_name:DDTaskName',
'environment:development',
'application:luigi'])

Expand All @@ -116,25 +118,30 @@ def test_send_event_on_task_done(self, create_dd_event):

create_dd_event.assert_called_once_with(alert_type='info',
priority='low',
tags=['task_name:MyTask',
tags=['task_name:DDTaskName',
'task_state:DONE',
'environment:development',
'application:luigi'],
text='A task has completed in the pipeline named: MyTask',
text='A task has completed in the pipeline named: DDTaskName',
title='Luigi: A task has been completed!')

@mock.patch('datadog.statsd.increment')
def test_send_increment_on_task_done(self, increment_dd_counter):
self.collector.handle_task_done(self.task)

increment_dd_counter.assert_called_once_with('luigi.task.done', 1, tags=['task_name:MyTask',
increment_dd_counter.assert_called_once_with('luigi.task.done', 1, tags=['task_name:DDTaskName',
'environment:development',
'application:luigi'])

@mock.patch('datadog.statsd.gauge')
def test_send_gauge_on_task_done(self, gauge_dd):
self.collector.handle_task_done(self.task)
self.setTime(0)
self.s.add_task(worker=WORKER, task_id='DDTaskIDFrozen', family='DDTaskNameFrozen')
frozen_task = self.s._state.get_task('DDTaskIDFrozen')
frozen_task.time_running = 0

self.collector.handle_task_done(frozen_task)

gauge_dd.assert_called_once_with('luigi.task.execution_time', 0, tags=['task_name:MyTask',
gauge_dd.assert_called_once_with('luigi.task.execution_time', 0, tags=['task_name:DDTaskNameFrozen',
'environment:development',
'application:luigi'])

0 comments on commit e8fafd5

Please sign in to comment.