Skip to content

Commit

Permalink
Merge pull request #1745 from twobraids/crontabber-transaction
Browse files Browse the repository at this point in the history
fixes Bug 950735 (v78) -  refactor crontabber to use the transaction_executor
  • Loading branch information
lonnen committed Mar 7, 2014
2 parents aa3fb06 + 7f8a6e3 commit 555e1d5
Show file tree
Hide file tree
Showing 22 changed files with 1,263 additions and 669 deletions.
227 changes: 93 additions & 134 deletions socorro/cron/base.py
Expand Up @@ -5,20 +5,22 @@
import collections
import datetime
import re
import subprocess

from socorro.lib.datetimeutil import utc_now
from configman import Namespace, RequiredConfig


#==============================================================================
class FrequencyDefinitionError(Exception):
pass


#==============================================================================
class CircularDAGError(Exception):
pass


#==============================================================================
def reorder_dag(sequence,
depends_getter=lambda x: x.depends_on,
name_getter=lambda x: x.app_name,
Expand Down Expand Up @@ -82,6 +84,7 @@ def reorder_dag(sequence,
return [map_[x] for x in ordered_jobs]


#==============================================================================
def convert_frequency(frequency):
"""return the number of seconds that a certain frequency string represents.
For example: `1d` means 1 day which means 60 * 60 * 24 seconds.
Expand All @@ -103,154 +106,110 @@ def convert_frequency(frequency):
return number


#==============================================================================
class BaseCronApp(RequiredConfig):
"""The base class from which Socorro apps are based"""
"""The base class from which Socorro cron apps are based. Subclasses
should use the cron app class decorators below to add features such as
PostgreSQL connections or backfill capability."""
required_config = Namespace()

#--------------------------------------------------------------------------
def __init__(self, config, job_information):
self.config = config
self.job_information = job_information

# commented out because it doesn't work and I don't know why!
# def __repr__(self): # pragma: no cover
# return ('<%s (app_name: %r, app_version:%r)>' % (
# self.__class__,
# self.app_name,
# self.app_version))
# commented out because it doesn't work and I don't know why!
# def __repr__(self): # pragma: no cover
# return ('<%s (app_name: %r, app_version:%r)>' % (
# self.__class__,
# self.app_name,
# self.app_version))

#--------------------------------------------------------------------------
def main(self, function=None, once=True):
if function is None:
if not function:
function = self._run_proxy
now = utc_now()
if once or not self.job_information:
if once:
function()
else:
function(now)
yield now
else:
# figure out when it was last run
last_success = self.job_information.get(
'last_success',
self.job_information.get('first_run')
)
if not last_success:
# either it has never run successfully or it was previously run
# before the 'first_run' key was added (legacy).
self.config.logger.warning(
'No previous last_success information available'
)
function(now)
yield now
else:
when = last_success
# The last_success datetime is originally based on the
# first_run. From then onwards it just adds the interval to
# it so the hour is not likely to drift from that original
# time.
# However, what can happen is that on a given day, "now" is
# LESS than the day before. This can happen because the jobs
# that are run BEFORE are variable in terms of how long it
# takes. Thus, one day, now might be "18:02" and the next day
# the it's "18:01". If this happens the original difference
# will prevent it from running the backfill again.
#
# For more info see the
# test_backfilling_with_configured_time_slow_job unit test.
if self.config.time:
# So, reset the hour/minute part to always match the
# intention.
h, m = [int(x) for x in self.config.time.split(':')]
when = when.replace(
hour=h,
minute=m,
second=0,
microsecond=0
)
seconds = convert_frequency(self.config.frequency)
interval = datetime.timedelta(seconds=seconds)
while (when + interval) < now:
when += interval
function(when)
yield when

def _run_proxy(self):
return self.run()

def run(self): # pragma: no cover
raise NotImplementedError("Your fault!")


class BaseBackfillCronApp(BaseCronApp):

def main(self, function=None):
return super(BaseBackfillCronApp, self).main(once=False,
function=function)

def _run_proxy(self, date):
return self.run(date)

def run(self, date): # pragma: no cover
raise NotImplementedError("Your fault!")


class PostgresCronApp(BaseCronApp):

def _run_proxy(self):
database = self.config.database.database_class(self.config.database)
with database() as connection:
self.run(connection)

def run(self, connection): # pragma: no cover
raise NotImplementedError("Your fault!")


class PostgresBackfillCronApp(BaseBackfillCronApp):
# handle one of four possible cases

def _run_proxy(self, date):
database = self.config.database.database_class(self.config.database)
with database() as connection:
self.run(connection, date)

def run(self, connection, date): # pragma: no cover
raise NotImplementedError("Your fault!")


class PostgresTransactionManagedCronApp(BaseCronApp):
# case 1: no backfill, just run this job now
if once:
function()
yield now
return

# XXX put transaction_executor here?
# case 2: this could be a backfil, but we don't have any
# job information. Run it with today's date
if not self.job_information:
function(now)
yield now
return

def main(self):
database = self.config.database.database_class(self.config.database)
executor = self.config.database.transaction_executor_class(
self.config.database,
database
# back fill cases:
# figure out when it was last run successfully
last_success = self.job_information.get(
'last_success',
self.job_information.get('first_run')
)
executor(self.run)
yield utc_now()

def run(self, connection): # pragma: no cover
# case 3: either it has never run successfully or it was previously run
# before the 'first_run' key was added (legacy).
if not last_success:
self.config.logger.warning(
'No previous last_success information available'
)
# just run it for the time 'now'
function(now)
yield now
return

# case 4:
when = last_success
# The last_success datetime is originally based on the
# first_run. From then onwards it just adds the interval to
# it so the hour is not likely to drift from that original
# time.
# However, what can happen is that on a given day, "now" is
# LESS than the day before. This can happen because the jobs
# that are run BEFORE are variable in terms of how long it
# takes. Thus, one day, now might be "18:02" and the next day
# the it's "18:01". If this happens the original difference
# will prevent it from running the backfill again.
#
# For more info see the
# test_backfilling_with_configured_time_slow_job unit test.
if self.config.time:
# So, reset the hour/minute part to always match the
# intention.
h, m = [int(x) for x in self.config.time.split(':')]
when = when.replace(
hour=h,
minute=m,
second=0,
microsecond=0
)
seconds = convert_frequency(self.config.frequency)
interval = datetime.timedelta(seconds=seconds)
# loop over each missed interval from the time of the last success,
# forward by each interval until it reaches the time 'now'. Run the
# cron app once for each interval.
while (when + interval) < now:
when += interval
function(when)
yield when

#--------------------------------------------------------------------------
def _run_proxy(self, *args, **kwargs):
"""this is indirection to the run function. By exectuting this method
instead of the actual "run" method directly, we can use inheritance
to provide some resources to the run function via the run function's
arguments"""
return self.run(*args, **kwargs)

#--------------------------------------------------------------------------
def run(self): # pragma: no cover
# crontabber apps should define their own run functions and not rely
# on these base classes. This default base method threatens a runtime
# error
raise NotImplementedError("Your fault!")


class SubprocessMixin(object):

def run_process(self, command, input=None):
"""
Run the command and return a tuple of three things.
1. exit code - an integer number
2. stdout - all output that was sent to stdout
2. stderr - all output that was sent to stderr
"""
if isinstance(command, (tuple, list)):
command = ' '.join('"%s"' % x for x in command)

proc = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, err = proc.communicate(input=input)
return proc.returncode, out.strip(), err.strip()

0 comments on commit 555e1d5

Please sign in to comment.