Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish between PROGRESS and PENDING for herd avoidance #40

Open
rhunwicks opened this issue Mar 20, 2014 · 6 comments
Open

Distinguish between PROGRESS and PENDING for herd avoidance #40

rhunwicks opened this issue Mar 20, 2014 · 6 comments

Comments

@rhunwicks
Copy link
Contributor

I want to use Jobtastic to manage the refresh of a PostgreSQL materialized view when the user updates the underlying tables. The user performs many updates in succession and refreshing the view takes about 5 minutes. Therefore, I want to:

  • submit the task with countdown=120 so that I wait for 2 minutes after the last update to see if there are any more updates before executing the task
  • if a new task is submitted during that countdown, then I reset the countdown (or replace the old task submission with the new one)
  • once the countdown is reached and the task execution starts, then the next update should put a new PENDING entry on the queue, that doesn't get executed until the current PROGRESS one is finished

Does that use case fit with the intention of Jobtastic?

I can see that at the moment it doesn't distinguish between PROGRESS and PENDING for herd avoidance and I would need that - so that the herd avoidance is on PENDING only.

Part of that change would also be to reset the countdown time on the task when exiting through the herd avoidance path.

My understanding of the acquire_lock code at the moment is that it prevents duplicate Tasks being submitted - I think we might need a separate lock along the lines of http://celery.readthedocs.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time to make sure that the PENDING task isn't executed until the PROGRESS one finishes.

I'm happy to work on PR for any required changes if you point me in the right direction.

@winhamwr
Copy link
Contributor

Hello!

If I was solving this problem, I think I would do something like this:

  • Whenever a change is made, I would set a known cache key with the current timestamp and then I would call a viewRecalcWaitTask (or some such)
  • viewRecalcWaitTask gets called with a 120 second countdown (as you suggested) and all it does when it runs is check that known cache key. If the timestamp is more than 120 seconds old, it just calls the viewRecalcTask that does the recalculation and then immediately exits. If the timestamp is less than 120 seconds old, it just exists without calling the recalculation task.
  • In the viewRecalcTask, record a timestamp as soon as it starts running. Then check the known cache value at the very end of the run. If the cache timestamp is later than the task's starting timestamp, then it knows that it needs to run again because an update happened while it was running. Then you can just start another viewRecalcWaitTask with a 120 second countdown without updating the known cache value.

I think that will result in the behavior you're looking for. Thoughts?

-Wes

@winhamwr
Copy link
Contributor

And if you could think of a good way to include this kind of pattern in Jobtastic with an understandable API, that would be super-cool. Maybe a generic groupedWaitTask and a helper method to get/set the cache with the timestamp?

@rhunwicks
Copy link
Contributor Author

This is my first cut...

  • apply_async stores the time the Task should wait until in the cache and then calls super.
  • run checks if the wait time has been reached:
    • if it hasn't then it queues a new Task to run at the wait time and returns None so the Task is treated as SUCCESSFUL by Celery
    • if the wait time has been reached then it runs the task as normal
    • after running the task it sees if the task has been resubmitted while the job was running and if so it submits a new Task for the end of the current wait time.

Does this look sensible to you?

class GroupedWaitTask(JobtasticTask):
    """
    An extension of ``JobtasticTask`` that waits a specified number of seconds
    since the last time it was submitted before it runs.

    This supports tasks that must be run when a set of updates has been complete
    but you don't know when that will be.

    One example is refreshing a summary table (perhaps implemented as a
    materialized view): the refresh must happen in a timely manner after the
    user updates the underlying records, but will be out of date as soon as
    another update is made. Therefore, we want to wait for a short time after
    each update is made to see if there are any more updates, and then run the
    refresh task after the "wait period" expires.

    The following class members are required:

    * ``wait_seconds`` The number of seconds since the last time this task was
      submitted that must elapse before the task can run.
    """
    abstract = True
    ignore_result = True

    @classmethod
    def apply_async(self, args, kwargs, **options):
        """
        Store the timestamp of the most recent submission for this task
        """
        self._validate_required_class_vars()
        cache_key = self._get_cache_key(**kwargs)

        # Store expiry time for the wait period
        if 'eta' in options:
            wait = options['eta']
        elif 'countdown' in options:
            wait = datetime.now(pytz.utc) + timedelta(seconds=options['countdown'])
        else:
            wait = datetime.now(pytz.utc) + timedelta(seconds=self.wait_seconds)
        logging.info("Setting %s to wait until %s", self.name, wait)
        cache.set('wait:%s' % cache_key, wait)

        return super(GroupedWaitTask, self).apply_async(args, kwargs, **options)

    def run(self, *args, **kwargs):
        cache_key = self._get_cache_key(**kwargs)

        if get_task_logger:
            self.logger = get_task_logger(self.__class__.__name__)
        else:
            # Celery 2.X fallback
            self.logger = self.get_logger(**kwargs)

        # If we haven't reached the end of the waiting period then schedule a
        # new task for then and exit
        wait = cache.get('wait:%s' % cache_key)
        if wait >= datetime.now(pytz.utc) and not self.request.is_eager:
            self.logger.info("Deferring %s until %s", self.__class__.__name__, wait)
            # Remove the existing herd protection because we want to submit a new task
            cache.delete('herd:%s' % cache_key)
            self.apply_async(args, kwargs, eta=wait)
            return None

        # We have reached the end of the wait period, so calculate the result
        result = super(GroupedWaitTask, self).run(*args, **kwargs)

        # If a task was submitted after we started calculating the result, then
        # submit a new job now
        new_wait = cache.get('wait:%s' % cache_key)
        if new_wait > wait and not self.request.is_eager:
            self.logger.info("Resubmitting %s for %s", self.__class__.__name__, new_wait)
            self.apply_async(args, kwargs, eta=new_wait)

        return result

    @classmethod
    def _validate_required_class_vars(self):
        """
        Ensure that this subclass has defined all of the required class
        variables.
        """
        required_members = (
            'wait_seconds',
        )
        for required_member in required_members:
            if not hasattr(self, required_member):
                raise Exception(
                    "GroupedWaitTask's must define a %s" % required_member)
        super(GroupedWaitTask, self)._validate_required_class_vars()

@winhamwr
Copy link
Contributor

This looks great! It's probably worth some tests with mocking to ensure we're hitting the right paths, but from reading through, I think this will do what we want. It's also better than my proposed API. This being effectively a wrapper around the task that actually does the work is quite slick.

The only think is missing here (besides being a pull request) is documentation for the README plus (ideally) at least some kind of tests for regression purposes. Supporting multiple versions of Celery without those is pretty tough.

This is very cool!

Thanks
-Wes

@rhunwicks
Copy link
Contributor Author

I'm happy to turn it into a PR and do docs. I'd like to do tests too, but I'm not sure where to start. I tried writing tests using my normal (django-based) approach, but it doesn't work in the absence of a working Celery server, and my attempts to get one to run under the control of the Django test runner have been unsuccessful so far. If you could provide an example, I'll add other test cases to it.

@winhamwr
Copy link
Contributor

I can completely relate to coming from a Django testing background.

this test might shed some light. In general, I find mock to be very necessary when writing Celery tests in general, but especially when testing something like Jobtastic.

-Wes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants