Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #1141 from peterbe/bug851184-job_list-smarter-order

bug 851184 - job_list smarter order
  • Loading branch information...
commit 2576c06e882ec624b876bb8bbd7a0cfec2540d93 2 parents 504ac2b + b1e5196
@lonnen lonnen authored
View
70 socorro/cron/base.py
@@ -1,6 +1,8 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import collections
import datetime
import re
@@ -12,6 +14,71 @@ class FrequencyDefinitionError(Exception):
pass
+class CircularDAGError(Exception):
+ pass
+
+
+def reorder_dag(sequence,
+ depends_getter=lambda x: x.depends_on,
+ name_getter=lambda x: x.app_name,
+ impatience_max=100):
+ """
+ DAG = Directed Acyclic Graph
+ If we have something like:
+ C depends on B
+ B depends on A
+ A doesn't depend on any
+
+ Given the order of [C, B, A] expect it to return [A, B, C]
+
+ parameters:
+
+ :sequence: some sort of iterable list
+
+ :depends_getter: a callable that extracts the depends on sub-list
+
+ :name_getter: a callable that extracts the name
+
+ :impatience_max: a max count that is reached before we end up in
+ an infinite loop.
+ """
+
+ ordered_jobs = []
+ ordered_jobs_set = set()
+
+ jobs = collections.defaultdict(list)
+ map_ = {}
+ _count_roots = 0
+ for each in sequence:
+ name = name_getter(each)
+ depends_on = depends_getter(each)
+ if depends_on is None:
+ depends_on = []
+ elif isinstance(depends_on, tuple):
+ depends_on = list(depends_on)
+ elif not isinstance(depends_on, list):
+ depends_on = [depends_on]
+ if not depends_on:
+ _count_roots += 1
+ jobs[name] += depends_on
+ map_[name] = each
+
+ if not _count_roots:
+ raise CircularDAGError("No job is at the root")
+ count = 0
+ while len(ordered_jobs) < len(jobs.keys()):
+ for job, deps in jobs.iteritems():
+ if job in ordered_jobs_set:
+ continue
+ if not set(deps).issubset(ordered_jobs_set):
+ continue
+ ordered_jobs.append(job)
+ ordered_jobs_set = set(ordered_jobs)
+ count += 1
+ if count > impatience_max:
+ raise CircularDAGError("Circular reference somewhere")
+
+ return [map_[x] for x in ordered_jobs]
def convert_frequency(frequency):
@@ -35,9 +102,6 @@ def convert_frequency(frequency):
return number
-
-
-
class BaseCronApp(RequiredConfig):
"""The base class from which Socorro apps are based"""
required_config = Namespace()
View
32 socorro/cron/crontabber.py
@@ -24,7 +24,8 @@
from socorro.cron.base import (
convert_frequency,
FrequencyDefinitionError,
- BaseBackfillCronApp
+ BaseBackfillCronApp,
+ reorder_dag
)
@@ -510,6 +511,18 @@ def main(self):
self.run_all()
return 0
+ @staticmethod
+ def _reorder_class_list(class_list):
+ # class_list looks something like this:
+ # [('FooBarJob', <class 'FooBarJob'>),
+ # ('BarJob', <class 'BarJob'>),
+ # ('FooJob', <class 'FooJob'>)]
+ return reorder_dag(
+ class_list,
+ depends_getter=lambda x: getattr(x[1], 'depends_on', None),
+ name_getter=lambda x: x[1].app_name
+ )
+
@property
def database(self):
if not getattr(self, '_database', None):
@@ -621,7 +634,9 @@ def reset_job(self, description):
"""remove the job from the state.
if means that next time we run, this job will start over from scratch.
"""
- for class_name, job_class in self.config.crontabber.jobs.class_list:
+ class_list = self.config.crontabber.jobs.class_list
+ class_list = self._reorder_class_list(class_list)
+ for class_name, job_class in class_list:
if (
job_class.app_name == description or
description == job_class.__module__ + '.' + job_class.__name__
@@ -636,14 +651,18 @@ def reset_job(self, description):
raise JobNotFoundError(description)
def run_all(self):
- for class_name, job_class in self.config.crontabber.jobs.class_list:
+ class_list = self.config.crontabber.jobs.class_list
+ class_list = self._reorder_class_list(class_list)
+ for class_name, job_class in class_list:
class_config = self.config.crontabber['class-%s' % class_name]
self._run_one(job_class, class_config)
def run_one(self, description, force=False):
# the description in this case is either the app_name or the full
# module/class reference
- for class_name, job_class in self.config.crontabber.jobs.class_list:
+ class_list = self.config.crontabber.jobs.class_list
+ class_list = self._reorder_class_list(class_list)
+ for class_name, job_class in class_list:
if (
job_class.app_name == description or
description == job_class.__module__ + '.' + job_class.__name__
@@ -791,7 +810,10 @@ def configtest(self):
"""return true if all configured jobs are configured OK"""
# similar to run_all() but don't actually run them
failed = 0
- for class_name, __ in self.config.crontabber.jobs.class_list:
+
+ class_list = self.config.crontabber.jobs.class_list
+ class_list = self._reorder_class_list(class_list)
+ for class_name, __ in class_list:
class_config = self.config.crontabber['class-%s' % class_name]
if not self._configtest_one(class_config):
failed += 1
View
12 socorro/unittest/cron/jobs/test_matviews.py
@@ -112,18 +112,6 @@ def mock_utc_now():
# regular jobs writing only once.
self.assertEqual(self.psycopg2().commit.call_count, 13 * 2 + 2)
- def test_reports_clean_dependency_prerequisite(self):
- config_manager, json_file = self._setup_config_manager(
- 'socorro.cron.jobs.matviews.ReportsCleanCronApp|1d'
- )
-
- with config_manager.context() as config:
- tab = crontabber.CronTabber(config)
- tab.run_all()
-
- # no file is created because it's unable to run anything
- self.assertTrue(not os.path.isfile(json_file))
-
def test_reports_clean_with_dependency(self):
config_manager, json_file = self._setup_config_manager(
'socorro.cron.jobs.matviews.DuplicatesCronApp|1h\n'
View
144 socorro/unittest/cron/test_crontabber.py
@@ -8,6 +8,7 @@
import os
import json
import time
+import unittest
from cStringIO import StringIO
import mock
import psycopg2
@@ -21,6 +22,108 @@
#==============================================================================
+class _Item(object):
+
+ def __init__(self, name, depends_on):
+ self.app_name = name
+ self.depends_on = depends_on
+
+
+class TestReordering(unittest.TestCase):
+
+ def test_basic_already_right(self):
+ sequence = [
+ _Item('A', []),
+ _Item('B', ['A']),
+ _Item('C', ['B']),
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertEqual(new_names, ['A', 'B', 'C'])
+
+ def test_three_levels(self):
+ sequence = [
+ _Item('A', []),
+ _Item('B', ['A']),
+ _Item('D', ['B', 'C']),
+ _Item('C', ['B']),
+
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertEqual(new_names, ['A', 'B', 'C', 'D'])
+
+ def test_basic_completely_reversed(self):
+ sequence = [
+ _Item('C', ['B']),
+ _Item('B', ['A']),
+ _Item('A', []),
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertEqual(new_names, ['A', 'B', 'C'])
+
+ def test_basic_sloppy_depends_on(self):
+ sequence = [
+ _Item('C', ('B',)),
+ _Item('B', 'A'),
+ _Item('A', None),
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertEqual(new_names, ['A', 'B', 'C'])
+
+ def test_two_trees(self):
+ sequence = [
+ _Item('C', ['B']),
+ _Item('B', ['A']),
+ _Item('A', []),
+ _Item('X', ['Y']),
+ _Item('Y', []),
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertTrue(
+ new_names.index('A')
+ <
+ new_names.index('B')
+ <
+ new_names.index('C')
+ )
+ self.assertTrue(
+ new_names.index('Y')
+ <
+ new_names.index('X')
+ )
+
+ def test_circular_no_roots(self):
+ sequence = [
+ _Item('C', ['B']),
+ _Item('B', ['A']),
+ _Item('A', ['C']),
+ ]
+ self.assertRaises(
+ base.CircularDAGError,
+ base.reorder_dag,
+ sequence
+ )
+
+ def test_circular_one_root_still_circular(self):
+ sequence = [
+ _Item('C', ['B']),
+ _Item('X', ['Y']),
+ _Item('Y', []),
+ _Item('B', ['A']),
+ _Item('A', ['C']),
+ ]
+ self.assertRaises(
+ base.CircularDAGError,
+ base.reorder_dag,
+ sequence
+ )
+
+
+#==============================================================================
class TestJSONJobsDatabase(TestCaseBase):
"""This has nothing to do with Socorro actually. It's just tests for the
underlying JSON database.
@@ -319,7 +422,8 @@ def test_run_all_with_failing_dependency(self):
config_manager, json_file = self._setup_config_manager(
'socorro.unittest.cron.test_crontabber.TroubleJob|1d\n'
'socorro.unittest.cron.test_crontabber.SadJob|1d\n'
- 'socorro.unittest.cron.test_crontabber.BasicJob|1d'
+ 'socorro.unittest.cron.test_crontabber.BasicJob|1d\n'
+ 'socorro.unittest.cron.test_crontabber.FooJob|1d'
)
with config_manager.context() as config:
@@ -328,7 +432,10 @@ def test_run_all_with_failing_dependency(self):
infos = [x[0][0] for x in config.logger.info.call_args_list]
infos = [x for x in infos if x.startswith('Ran ')]
- self.assertEqual(infos, ['Ran TroubleJob', 'Ran BasicJob'])
+ self.assertEqual(
+ infos,
+ ['Ran BasicJob', 'Ran TroubleJob', 'Ran FooJob']
+ )
# note how SadJob couldn't be run!
# let's see what information we have
assert os.path.isfile(json_file)
@@ -366,11 +473,10 @@ def test_run_all_basic_with_failing_dependency_without_errors(self):
# hasn't never run
with config_manager.context() as config:
tab = crontabber.CronTabber(config)
- tab.run_all()
-
- infos = [x[0][0] for x in config.logger.info.call_args_list]
- infos = [x for x in infos if x.startswith('Ran ')]
- self.assertEqual(infos, [])
+ self.assertRaises(
+ base.CircularDAGError,
+ tab.run_all
+ )
def test_run_all_with_failing_dependency_without_errors_but_old(self):
config_manager, json_file = self._setup_config_manager(
@@ -1369,6 +1475,30 @@ def test_nagios_critical(self):
self.assertTrue('NameError' in output)
self.assertTrue('Trouble!!' in output)
+ def test_reorder_dag_on_joblist(self):
+ config_manager, json_file = self._setup_config_manager(
+ 'socorro.unittest.cron.test_crontabber.FooBarJob|1d\n'
+ 'socorro.unittest.cron.test_crontabber.BarJob|1d\n'
+ 'socorro.unittest.cron.test_crontabber.FooJob|1d'
+ )
+ # looking at the dependencies, since FooJob doesn't depend on anything
+ # it should be run first, then BarJob and lastly FooBarJob because
+ # FooBarJob depends on FooJob and BarJob.
+ with config_manager.context() as config:
+ tab = crontabber.CronTabber(config)
+ tab.run_all()
+ structure = json.load(open(json_file))
+ self.assertTrue('foo' in structure)
+ self.assertTrue('bar' in structure)
+ self.assertTrue('foobar' in structure)
+ self.assertTrue(
+ structure['foo']['last_run']
+ <
+ structure['bar']['last_run']
+ <
+ structure['foobar']['last_run']
+ )
+
#==============================================================================
@attr(integration='postgres') # for nosetests
Please sign in to comment.
Something went wrong with that request. Please try again.