Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

bug 851184 - job_list smarter order

  • Loading branch information...
commit b24f159f88db0cbfb9dbcfd7f8a9785e6386eac4 1 parent 855c5c7
Peter Bengtsson authored
70  socorro/cron/base.py
... ...
@@ -1,6 +1,8 @@
1 1
 # This Source Code Form is subject to the terms of the Mozilla Public
2 2
 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 3
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4
+
  5
+import collections
4 6
 import datetime
5 7
 import re
6 8
 
@@ -12,6 +14,71 @@ class FrequencyDefinitionError(Exception):
12 14
     pass
13 15
 
14 16
 
  17
+class CircularDAGError(Exception):
  18
+    pass
  19
+
  20
+
  21
+def reorder_dag(sequence,
  22
+                depends_getter=lambda x: x.depends_on,
  23
+                name_getter=lambda x: x.app_name,
  24
+                impatience_max=100):
  25
+    """
  26
+    DAG = Directed Acyclic Graph
  27
+    If we have something like:
  28
+        C depends on B
  29
+        B depends on A
  30
+        A doesn't depend on any
  31
+
  32
+    Given the order of [C, B, A] expect it to return [A, B, C]
  33
+
  34
+    parameters:
  35
+
  36
+        :sequence: some sort of iterable list
  37
+
  38
+        :depends_getter: a callable that extracts the depends on sub-list
  39
+
  40
+        :name_getter: a callable that extracts the name
  41
+
  42
+        :impatience_max: a max count that is reached before we end up in
  43
+                         an infinite loop.
  44
+    """
  45
+
  46
+    ordered_jobs = []
  47
+    ordered_jobs_set = set()
  48
+
  49
+    jobs = collections.defaultdict(list)
  50
+    map_ = {}
  51
+    _count_roots = 0
  52
+    for each in sequence:
  53
+        name = name_getter(each)
  54
+        depends_on = depends_getter(each)
  55
+        if depends_on is None:
  56
+            depends_on = []
  57
+        elif isinstance(depends_on, tuple):
  58
+            depends_on = list(depends_on)
  59
+        elif not isinstance(depends_on, list):
  60
+            depends_on = [depends_on]
  61
+        if not depends_on:
  62
+            _count_roots += 1
  63
+        jobs[name] += depends_on
  64
+        map_[name] = each
  65
+
  66
+    if not _count_roots:
  67
+        raise CircularDAGError("No job is at the root")
  68
+    count = 0
  69
+    while len(ordered_jobs) < len(jobs.keys()):
  70
+        for job, deps in jobs.iteritems():
  71
+            if job in ordered_jobs_set:
  72
+                continue
  73
+            if not set(deps).issubset(ordered_jobs_set):
  74
+                continue
  75
+            ordered_jobs.append(job)
  76
+            ordered_jobs_set = set(ordered_jobs)
  77
+        count += 1
  78
+        if count > impatience_max:
  79
+            raise CircularDAGError("Circular reference somewhere")
  80
+
  81
+    return [map_[x] for x in ordered_jobs]
15 82
 
16 83
 
17 84
 def convert_frequency(frequency):
@@ -35,9 +102,6 @@ def convert_frequency(frequency):
35 102
     return number
36 103
 
37 104
 
38  
-
39  
-
40  
-
41 105
 class BaseCronApp(RequiredConfig):
42 106
     """The base class from which Socorro apps are based"""
43 107
     required_config = Namespace()
32  socorro/cron/crontabber.py
@@ -24,7 +24,8 @@
24 24
 from socorro.cron.base import (
25 25
     convert_frequency,
26 26
     FrequencyDefinitionError,
27  
-    BaseBackfillCronApp
  27
+    BaseBackfillCronApp,
  28
+    reorder_dag
28 29
 )
29 30
 
30 31
 
@@ -510,6 +511,18 @@ def main(self):
510 511
             self.run_all()
511 512
         return 0
512 513
 
  514
+    @staticmethod
  515
+    def _reorder_class_list(class_list):
  516
+        # class_list looks something like this:
  517
+        # [('FooBarJob', <class 'FooBarJob'>),
  518
+        #  ('BarJob', <class 'BarJob'>),
  519
+        #  ('FooJob', <class 'FooJob'>)]
  520
+        return reorder_dag(
  521
+            class_list,
  522
+            depends_getter=lambda x: getattr(x[1], 'depends_on', None),
  523
+            name_getter=lambda x: x[1].app_name
  524
+        )
  525
+
513 526
     @property
514 527
     def database(self):
515 528
         if not getattr(self, '_database', None):
@@ -621,7 +634,9 @@ def reset_job(self, description):
621 634
         """remove the job from the state.
622 635
         if means that next time we run, this job will start over from scratch.
623 636
         """
624  
-        for class_name, job_class in self.config.crontabber.jobs.class_list:
  637
+        class_list = self.config.crontabber.jobs.class_list
  638
+        class_list = self._reorder_class_list(class_list)
  639
+        for class_name, job_class in class_list:
625 640
             if (
626 641
                 job_class.app_name == description or
627 642
                 description == job_class.__module__ + '.' + job_class.__name__
@@ -636,14 +651,18 @@ def reset_job(self, description):
636 651
         raise JobNotFoundError(description)
637 652
 
638 653
     def run_all(self):
639  
-        for class_name, job_class in self.config.crontabber.jobs.class_list:
  654
+        class_list = self.config.crontabber.jobs.class_list
  655
+        class_list = self._reorder_class_list(class_list)
  656
+        for class_name, job_class in class_list:
640 657
             class_config = self.config.crontabber['class-%s' % class_name]
641 658
             self._run_one(job_class, class_config)
642 659
 
643 660
     def run_one(self, description, force=False):
644 661
         # the description in this case is either the app_name or the full
645 662
         # module/class reference
646  
-        for class_name, job_class in self.config.crontabber.jobs.class_list:
  663
+        class_list = self.config.crontabber.jobs.class_list
  664
+        class_list = self._reorder_class_list(class_list)
  665
+        for class_name, job_class in class_list:
647 666
             if (
648 667
                 job_class.app_name == description or
649 668
                 description == job_class.__module__ + '.' + job_class.__name__
@@ -791,7 +810,10 @@ def configtest(self):
791 810
         """return true if all configured jobs are configured OK"""
792 811
         # similar to run_all() but don't actually run them
793 812
         failed = 0
794  
-        for class_name, __ in self.config.crontabber.jobs.class_list:
  813
+
  814
+        class_list = self.config.crontabber.jobs.class_list
  815
+        class_list = self._reorder_class_list(class_list)
  816
+        for class_name, __ in class_list:
795 817
             class_config = self.config.crontabber['class-%s' % class_name]
796 818
             if not self._configtest_one(class_config):
797 819
                 failed += 1
144  socorro/unittest/cron/test_crontabber.py
@@ -8,6 +8,7 @@
8 8
 import os
9 9
 import json
10 10
 import time
  11
+import unittest
11 12
 from cStringIO import StringIO
12 13
 import mock
13 14
 import psycopg2
@@ -21,6 +22,108 @@
21 22
 
22 23
 
23 24
 #==============================================================================
  25
+class _Item(object):
  26
+
  27
+    def __init__(self, name, depends_on):
  28
+        self.app_name = name
  29
+        self.depends_on = depends_on
  30
+
  31
+
  32
+class TestReordering(unittest.TestCase):
  33
+
  34
+    def test_basic_already_right(self):
  35
+        sequence = [
  36
+            _Item('A', []),
  37
+            _Item('B', ['A']),
  38
+            _Item('C', ['B']),
  39
+        ]
  40
+        new_sequence = base.reorder_dag(sequence)
  41
+        new_names = [x.app_name for x in new_sequence]
  42
+        self.assertEqual(new_names, ['A', 'B', 'C'])
  43
+
  44
+    def test_three_levels(self):
  45
+        sequence = [
  46
+            _Item('A', []),
  47
+            _Item('B', ['A']),
  48
+            _Item('D', ['B', 'C']),
  49
+            _Item('C', ['B']),
  50
+
  51
+        ]
  52
+        new_sequence = base.reorder_dag(sequence)
  53
+        new_names = [x.app_name for x in new_sequence]
  54
+        self.assertEqual(new_names, ['A', 'B', 'C', 'D'])
  55
+
  56
+    def test_basic_completely_reversed(self):
  57
+        sequence = [
  58
+            _Item('C', ['B']),
  59
+            _Item('B', ['A']),
  60
+            _Item('A', []),
  61
+        ]
  62
+        new_sequence = base.reorder_dag(sequence)
  63
+        new_names = [x.app_name for x in new_sequence]
  64
+        self.assertEqual(new_names, ['A', 'B', 'C'])
  65
+
  66
+    def test_basic_sloppy_depends_on(self):
  67
+        sequence = [
  68
+            _Item('C', ('B',)),
  69
+            _Item('B', 'A'),
  70
+            _Item('A', None),
  71
+        ]
  72
+        new_sequence = base.reorder_dag(sequence)
  73
+        new_names = [x.app_name for x in new_sequence]
  74
+        self.assertEqual(new_names, ['A', 'B', 'C'])
  75
+
  76
+    def test_two_trees(self):
  77
+        sequence = [
  78
+            _Item('C', ['B']),
  79
+            _Item('B', ['A']),
  80
+            _Item('A', []),
  81
+            _Item('X', ['Y']),
  82
+            _Item('Y', []),
  83
+        ]
  84
+        new_sequence = base.reorder_dag(sequence)
  85
+        new_names = [x.app_name for x in new_sequence]
  86
+        self.assertTrue(
  87
+            new_names.index('A')
  88
+            <
  89
+            new_names.index('B')
  90
+            <
  91
+            new_names.index('C')
  92
+        )
  93
+        self.assertTrue(
  94
+            new_names.index('Y')
  95
+            <
  96
+            new_names.index('X')
  97
+        )
  98
+
  99
+    def test_circular_no_roots(self):
  100
+        sequence = [
  101
+            _Item('C', ['B']),
  102
+            _Item('B', ['A']),
  103
+            _Item('A', ['C']),
  104
+        ]
  105
+        self.assertRaises(
  106
+            base.CircularDAGError,
  107
+            base.reorder_dag,
  108
+            sequence
  109
+        )
  110
+
  111
+    def test_circular_one_root_still_circular(self):
  112
+        sequence = [
  113
+            _Item('C', ['B']),
  114
+            _Item('X', ['Y']),
  115
+            _Item('Y', []),
  116
+            _Item('B', ['A']),
  117
+            _Item('A', ['C']),
  118
+        ]
  119
+        self.assertRaises(
  120
+            base.CircularDAGError,
  121
+            base.reorder_dag,
  122
+            sequence
  123
+        )
  124
+
  125
+
  126
+#==============================================================================
24 127
 class TestJSONJobsDatabase(TestCaseBase):
25 128
     """This has nothing to do with Socorro actually. It's just tests for the
26 129
     underlying JSON database.
@@ -319,7 +422,8 @@ def test_run_all_with_failing_dependency(self):
319 422
         config_manager, json_file = self._setup_config_manager(
320 423
             'socorro.unittest.cron.test_crontabber.TroubleJob|1d\n'
321 424
             'socorro.unittest.cron.test_crontabber.SadJob|1d\n'
322  
-            'socorro.unittest.cron.test_crontabber.BasicJob|1d'
  425
+            'socorro.unittest.cron.test_crontabber.BasicJob|1d\n'
  426
+            'socorro.unittest.cron.test_crontabber.FooJob|1d'
323 427
         )
324 428
 
325 429
         with config_manager.context() as config:
@@ -328,7 +432,10 @@ def test_run_all_with_failing_dependency(self):
328 432
 
329 433
             infos = [x[0][0] for x in config.logger.info.call_args_list]
330 434
             infos = [x for x in infos if x.startswith('Ran ')]
331  
-            self.assertEqual(infos, ['Ran TroubleJob', 'Ran BasicJob'])
  435
+            self.assertEqual(
  436
+                infos,
  437
+                ['Ran BasicJob', 'Ran TroubleJob', 'Ran FooJob']
  438
+            )
332 439
             # note how SadJob couldn't be run!
333 440
             # let's see what information we have
334 441
             assert os.path.isfile(json_file)
@@ -366,11 +473,10 @@ def test_run_all_basic_with_failing_dependency_without_errors(self):
366 473
         # hasn't never run
367 474
         with config_manager.context() as config:
368 475
             tab = crontabber.CronTabber(config)
369  
-            tab.run_all()
370  
-
371  
-            infos = [x[0][0] for x in config.logger.info.call_args_list]
372  
-            infos = [x for x in infos if x.startswith('Ran ')]
373  
-            self.assertEqual(infos, [])
  476
+            self.assertRaises(
  477
+                base.CircularDAGError,
  478
+                tab.run_all
  479
+            )
374 480
 
375 481
     def test_run_all_with_failing_dependency_without_errors_but_old(self):
376 482
         config_manager, json_file = self._setup_config_manager(
@@ -1369,6 +1475,30 @@ def test_nagios_critical(self):
1369 1475
             self.assertTrue('NameError' in output)
1370 1476
             self.assertTrue('Trouble!!' in output)
1371 1477
 
  1478
+    def test_reorder_dag_on_joblist(self):
  1479
+        config_manager, json_file = self._setup_config_manager(
  1480
+            'socorro.unittest.cron.test_crontabber.FooBarJob|1d\n'
  1481
+            'socorro.unittest.cron.test_crontabber.BarJob|1d\n'
  1482
+            'socorro.unittest.cron.test_crontabber.FooJob|1d'
  1483
+        )
  1484
+        # looking at the dependencies, since FooJob doesn't depend on anything
  1485
+        # it should be run first, then BarJob and lastly FooBarJob because
  1486
+        # FooBarJob depends on FooJob and BarJob.
  1487
+        with config_manager.context() as config:
  1488
+            tab = crontabber.CronTabber(config)
  1489
+            tab.run_all()
  1490
+            structure = json.load(open(json_file))
  1491
+            self.assertTrue('foo' in structure)
  1492
+            self.assertTrue('bar' in structure)
  1493
+            self.assertTrue('foobar' in structure)
  1494
+            self.assertTrue(
  1495
+                structure['foo']['last_run']
  1496
+                <
  1497
+                structure['bar']['last_run']
  1498
+                <
  1499
+                structure['foobar']['last_run']
  1500
+            )
  1501
+
1372 1502
 
1373 1503
 #==============================================================================
1374 1504
 @attr(integration='postgres')  # for nosetests

0 notes on commit b24f159

Please sign in to comment.
Something went wrong with that request. Please try again.