Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

bug 851184 - job_list smarter order #1141

Merged
merged 2 commits into from

2 participants

Peter Bengtsson Chris Lonnen
Peter Bengtsson
Owner

See the bug for a longer description.
https://bugzilla.mozilla.org/show_bug.cgi?id=851184#c0

@lonnen r?

Chris Lonnen lonnen was assigned
Chris Lonnen lonnen merged commit 2576c06 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 14, 2013
  1. Peter Bengtsson
Commits on Mar 15, 2013
  1. Peter Bengtsson
This page is out of date. Refresh to see the latest.
70 socorro/cron/base.py
View
@@ -1,6 +1,8 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import collections
import datetime
import re
@@ -12,6 +14,71 @@ class FrequencyDefinitionError(Exception):
pass
+class CircularDAGError(Exception):
+ pass
+
+
+def reorder_dag(sequence,
+ depends_getter=lambda x: x.depends_on,
+ name_getter=lambda x: x.app_name,
+ impatience_max=100):
+ """
+ DAG = Directed Acyclic Graph
+ If we have something like:
+ C depends on B
+ B depends on A
+ A doesn't depend on any
+
+ Given the order of [C, B, A] expect it to return [A, B, C]
+
+ parameters:
+
+ :sequence: some sort of iterable list
+
+ :depends_getter: a callable that extracts the depends on sub-list
+
+ :name_getter: a callable that extracts the name
+
+ :impatience_max: a max count that is reached before we end up in
+ an infinite loop.
+ """
+
+ ordered_jobs = []
+ ordered_jobs_set = set()
+
+ jobs = collections.defaultdict(list)
+ map_ = {}
+ _count_roots = 0
+ for each in sequence:
+ name = name_getter(each)
+ depends_on = depends_getter(each)
+ if depends_on is None:
+ depends_on = []
+ elif isinstance(depends_on, tuple):
+ depends_on = list(depends_on)
+ elif not isinstance(depends_on, list):
+ depends_on = [depends_on]
+ if not depends_on:
+ _count_roots += 1
+ jobs[name] += depends_on
+ map_[name] = each
+
+ if not _count_roots:
+ raise CircularDAGError("No job is at the root")
+ count = 0
+ while len(ordered_jobs) < len(jobs.keys()):
+ for job, deps in jobs.iteritems():
+ if job in ordered_jobs_set:
+ continue
+ if not set(deps).issubset(ordered_jobs_set):
+ continue
+ ordered_jobs.append(job)
+ ordered_jobs_set = set(ordered_jobs)
+ count += 1
+ if count > impatience_max:
+ raise CircularDAGError("Circular reference somewhere")
+
+ return [map_[x] for x in ordered_jobs]
def convert_frequency(frequency):
@@ -35,9 +102,6 @@ def convert_frequency(frequency):
return number
-
-
-
class BaseCronApp(RequiredConfig):
"""The base class from which Socorro apps are based"""
required_config = Namespace()
32 socorro/cron/crontabber.py
View
@@ -24,7 +24,8 @@
from socorro.cron.base import (
convert_frequency,
FrequencyDefinitionError,
- BaseBackfillCronApp
+ BaseBackfillCronApp,
+ reorder_dag
)
@@ -510,6 +511,18 @@ def main(self):
self.run_all()
return 0
+ @staticmethod
+ def _reorder_class_list(class_list):
+ # class_list looks something like this:
+ # [('FooBarJob', <class 'FooBarJob'>),
+ # ('BarJob', <class 'BarJob'>),
+ # ('FooJob', <class 'FooJob'>)]
+ return reorder_dag(
+ class_list,
+ depends_getter=lambda x: getattr(x[1], 'depends_on', None),
+ name_getter=lambda x: x[1].app_name
+ )
+
@property
def database(self):
if not getattr(self, '_database', None):
@@ -621,7 +634,9 @@ def reset_job(self, description):
"""remove the job from the state.
if means that next time we run, this job will start over from scratch.
"""
- for class_name, job_class in self.config.crontabber.jobs.class_list:
+ class_list = self.config.crontabber.jobs.class_list
+ class_list = self._reorder_class_list(class_list)
+ for class_name, job_class in class_list:
if (
job_class.app_name == description or
description == job_class.__module__ + '.' + job_class.__name__
@@ -636,14 +651,18 @@ def reset_job(self, description):
raise JobNotFoundError(description)
def run_all(self):
- for class_name, job_class in self.config.crontabber.jobs.class_list:
+ class_list = self.config.crontabber.jobs.class_list
+ class_list = self._reorder_class_list(class_list)
+ for class_name, job_class in class_list:
class_config = self.config.crontabber['class-%s' % class_name]
self._run_one(job_class, class_config)
def run_one(self, description, force=False):
# the description in this case is either the app_name or the full
# module/class reference
- for class_name, job_class in self.config.crontabber.jobs.class_list:
+ class_list = self.config.crontabber.jobs.class_list
+ class_list = self._reorder_class_list(class_list)
+ for class_name, job_class in class_list:
if (
job_class.app_name == description or
description == job_class.__module__ + '.' + job_class.__name__
@@ -791,7 +810,10 @@ def configtest(self):
"""return true if all configured jobs are configured OK"""
# similar to run_all() but don't actually run them
failed = 0
- for class_name, __ in self.config.crontabber.jobs.class_list:
+
+ class_list = self.config.crontabber.jobs.class_list
+ class_list = self._reorder_class_list(class_list)
+ for class_name, __ in class_list:
class_config = self.config.crontabber['class-%s' % class_name]
if not self._configtest_one(class_config):
failed += 1
12 socorro/unittest/cron/jobs/test_matviews.py
View
@@ -112,18 +112,6 @@ def mock_utc_now():
# regular jobs writing only once.
self.assertEqual(self.psycopg2().commit.call_count, 13 * 2 + 2)
- def test_reports_clean_dependency_prerequisite(self):
- config_manager, json_file = self._setup_config_manager(
- 'socorro.cron.jobs.matviews.ReportsCleanCronApp|1d'
- )
-
- with config_manager.context() as config:
- tab = crontabber.CronTabber(config)
- tab.run_all()
-
- # no file is created because it's unable to run anything
- self.assertTrue(not os.path.isfile(json_file))
-
def test_reports_clean_with_dependency(self):
config_manager, json_file = self._setup_config_manager(
'socorro.cron.jobs.matviews.DuplicatesCronApp|1h\n'
144 socorro/unittest/cron/test_crontabber.py
View
@@ -8,6 +8,7 @@
import os
import json
import time
+import unittest
from cStringIO import StringIO
import mock
import psycopg2
@@ -21,6 +22,108 @@
#==============================================================================
+class _Item(object):
+
+ def __init__(self, name, depends_on):
+ self.app_name = name
+ self.depends_on = depends_on
+
+
+class TestReordering(unittest.TestCase):
+
+ def test_basic_already_right(self):
+ sequence = [
+ _Item('A', []),
+ _Item('B', ['A']),
+ _Item('C', ['B']),
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertEqual(new_names, ['A', 'B', 'C'])
+
+ def test_three_levels(self):
+ sequence = [
+ _Item('A', []),
+ _Item('B', ['A']),
+ _Item('D', ['B', 'C']),
+ _Item('C', ['B']),
+
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertEqual(new_names, ['A', 'B', 'C', 'D'])
+
+ def test_basic_completely_reversed(self):
+ sequence = [
+ _Item('C', ['B']),
+ _Item('B', ['A']),
+ _Item('A', []),
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertEqual(new_names, ['A', 'B', 'C'])
+
+ def test_basic_sloppy_depends_on(self):
+ sequence = [
+ _Item('C', ('B',)),
+ _Item('B', 'A'),
+ _Item('A', None),
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertEqual(new_names, ['A', 'B', 'C'])
+
+ def test_two_trees(self):
+ sequence = [
+ _Item('C', ['B']),
+ _Item('B', ['A']),
+ _Item('A', []),
+ _Item('X', ['Y']),
+ _Item('Y', []),
+ ]
+ new_sequence = base.reorder_dag(sequence)
+ new_names = [x.app_name for x in new_sequence]
+ self.assertTrue(
+ new_names.index('A')
+ <
+ new_names.index('B')
+ <
+ new_names.index('C')
+ )
+ self.assertTrue(
+ new_names.index('Y')
+ <
+ new_names.index('X')
+ )
+
+ def test_circular_no_roots(self):
+ sequence = [
+ _Item('C', ['B']),
+ _Item('B', ['A']),
+ _Item('A', ['C']),
+ ]
+ self.assertRaises(
+ base.CircularDAGError,
+ base.reorder_dag,
+ sequence
+ )
+
+ def test_circular_one_root_still_circular(self):
+ sequence = [
+ _Item('C', ['B']),
+ _Item('X', ['Y']),
+ _Item('Y', []),
+ _Item('B', ['A']),
+ _Item('A', ['C']),
+ ]
+ self.assertRaises(
+ base.CircularDAGError,
+ base.reorder_dag,
+ sequence
+ )
+
+
+#==============================================================================
class TestJSONJobsDatabase(TestCaseBase):
"""This has nothing to do with Socorro actually. It's just tests for the
underlying JSON database.
@@ -319,7 +422,8 @@ def test_run_all_with_failing_dependency(self):
config_manager, json_file = self._setup_config_manager(
'socorro.unittest.cron.test_crontabber.TroubleJob|1d\n'
'socorro.unittest.cron.test_crontabber.SadJob|1d\n'
- 'socorro.unittest.cron.test_crontabber.BasicJob|1d'
+ 'socorro.unittest.cron.test_crontabber.BasicJob|1d\n'
+ 'socorro.unittest.cron.test_crontabber.FooJob|1d'
)
with config_manager.context() as config:
@@ -328,7 +432,10 @@ def test_run_all_with_failing_dependency(self):
infos = [x[0][0] for x in config.logger.info.call_args_list]
infos = [x for x in infos if x.startswith('Ran ')]
- self.assertEqual(infos, ['Ran TroubleJob', 'Ran BasicJob'])
+ self.assertEqual(
+ infos,
+ ['Ran BasicJob', 'Ran TroubleJob', 'Ran FooJob']
+ )
# note how SadJob couldn't be run!
# let's see what information we have
assert os.path.isfile(json_file)
@@ -366,11 +473,10 @@ def test_run_all_basic_with_failing_dependency_without_errors(self):
# hasn't never run
with config_manager.context() as config:
tab = crontabber.CronTabber(config)
- tab.run_all()
-
- infos = [x[0][0] for x in config.logger.info.call_args_list]
- infos = [x for x in infos if x.startswith('Ran ')]
- self.assertEqual(infos, [])
+ self.assertRaises(
+ base.CircularDAGError,
+ tab.run_all
+ )
def test_run_all_with_failing_dependency_without_errors_but_old(self):
config_manager, json_file = self._setup_config_manager(
@@ -1369,6 +1475,30 @@ def test_nagios_critical(self):
self.assertTrue('NameError' in output)
self.assertTrue('Trouble!!' in output)
+ def test_reorder_dag_on_joblist(self):
+ config_manager, json_file = self._setup_config_manager(
+ 'socorro.unittest.cron.test_crontabber.FooBarJob|1d\n'
+ 'socorro.unittest.cron.test_crontabber.BarJob|1d\n'
+ 'socorro.unittest.cron.test_crontabber.FooJob|1d'
+ )
+ # looking at the dependencies, since FooJob doesn't depend on anything
+ # it should be run first, then BarJob and lastly FooBarJob because
+ # FooBarJob depends on FooJob and BarJob.
+ with config_manager.context() as config:
+ tab = crontabber.CronTabber(config)
+ tab.run_all()
+ structure = json.load(open(json_file))
+ self.assertTrue('foo' in structure)
+ self.assertTrue('bar' in structure)
+ self.assertTrue('foobar' in structure)
+ self.assertTrue(
+ structure['foo']['last_run']
+ <
+ structure['bar']['last_run']
+ <
+ structure['foobar']['last_run']
+ )
+
#==============================================================================
@attr(integration='postgres') # for nosetests
Something went wrong with that request. Please try again.