Skip to content

Commit

Permalink
Merge pull request #1497 from noirbizarre/jobs-commands
Browse files Browse the repository at this point in the history
Jobs commands
  • Loading branch information
noirbizarre committed Mar 13, 2018
2 parents 5963f5f + b067d1a commit 4ae3df1
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@
- High priority for sendmail tasks [#1484](https://github.com/opendatateam/udata/pull/1484)
- Improve tasks/jobs queues routing [#1487](https://github.com/opendatateam/udata/pull/1487)
- Add security.send_confirmation template [#1475](https://github.com/opendatateam/udata/pull/1475)
- Add the `udata schedule|unschedule|scheduled` commands [#1497](https://github.com/opendatateam/udata/pull/1497)

## 1.2.11 (2018-02-05)

Expand Down
51 changes: 41 additions & 10 deletions docs/administrative-tasks.md
Expand Up @@ -82,16 +82,16 @@ You can list available jobs with:

```shell
$ udata job list
-> log-test
-> purge-organizations
-> purge-datasets
-> bump-metrics
-> purge-reuses
-> error-test
-> harvest
-> send-frequency-reminder
-> crawl-resources
-> count-tags
log-test
purge-organizations
purge-datasets
bump-metrics
purge-reuses
error-test
harvest
send-frequency-reminder
crawl-resources
count-tags
```

You can launch a job with:
Expand All @@ -114,6 +114,37 @@ $ udata job run job-name arg1 arg2 key1=value key2=value
Most of the time, you won't need it because there will be a dedicated command
to perform the task you need.

You can also schedule or unschedule jobs (and list scheduled jobs):

```shell
$ udata job scheduled
# No scheduled jobs
$ udata job schedule "0 * * * *" count-tags
➢ Scheduled Job count-tags with the following crontab: 0 * * * *
$ udata job scheduled
Count tags: count-tags ↦ 0 * * * *
# Same command to reschedule
$ udata job schedule "1 * * * *" count-tags
➢ Scheduled Job count-tags with the following crontab: 1 * * * *
$ udata job scheduled
Count tags: count-tags ↦ 1 * * * *
$ udata job unschedule count-tags
➢ Unscheduled Job count-tags with the following crontab: 0 * * * *
$ udata job scheduled
# No scheduled jobs
```

Because a job can be scheduled multiple times with different parameters,
you need to provide the same parameters to unschedule:

```shell
$ udata job schedule my-job "0 * * * *" arg key=value
➢ Scheduled Job my-job(arg, key=value) with the following crontab: 0 * * * *
$ udata job unschedule my-job
✘ No scheduled job match Job my-job
$ udata job unschedule my-job arg key=value
➢ Unscheduled Job my-job(arg, key=value) with the following crontab: 0 * * * *
```

## Reindexing data

Expand Down
105 changes: 99 additions & 6 deletions udata/core/jobs/commands.py
Expand Up @@ -5,11 +5,24 @@

import click

from udata.commands import cli, exit_with_error
from udata.commands import cli, exit_with_error, echo, white
from udata.tasks import schedulables, celery

from .models import PeriodicTask

log = logging.getLogger(__name__)

SCHEDULE_LINE = '{name}: {label} ↦ {schedule}'


def job_label(name, args, kwargs):
label = name
params = args[:]
params += ['='.join((k, v)) for k, v in sorted(kwargs.items())]
if params:
label += '(' + ', '.join(params) + ')'
return label


@cli.group('job')
def grp():
Expand Down Expand Up @@ -38,9 +51,7 @@ def run(name, params, delay):
if name not in celery.tasks:
exit_with_error('Job %s not found', name)
job = celery.tasks[name]
label = name
if params:
label += '(' + ', '.join(params) + ')'
label = job_label(name, args, kwargs)
if delay:
log.info('Sending job %s', label)
job.delay(*args, **kwargs)
Expand All @@ -54,5 +65,87 @@ def run(name, params, delay):
@grp.command()
def list():
'''List all availables jobs'''
for job in schedulables():
log.info(job.name)
for job in sorted(schedulables()):
echo(job.name)


@grp.command()
@click.argument('cron', metavar='<cron>')
@click.argument('name', metavar='<name>')
@click.argument('params', nargs=-1, metavar='<arg key=value ...>')
def schedule(cron, name, params):
'''
Schedule the job <name> to run periodically given the <cron> expression.
Jobs args and kwargs are given as parameters without dashes.
Ex:
udata job schedule my-job "* * 0 * *" arg1 arg2 key1=value key2=value
'''
if name not in celery.tasks:
exit_with_error('Job %s not found', name)

args = [p for p in params if '=' not in p]
kwargs = dict(p.split('=') for p in params if '=' in p)
label = 'Job {0}'.format(job_label(name, args, kwargs))

try:
task = PeriodicTask.objects.get(task=name, args=args, kwargs=kwargs)
task.modify(crontab=PeriodicTask.Crontab.parse(cron))
except PeriodicTask.DoesNotExist:
task = PeriodicTask.objects.create(
task=name,
name=label,
description='Periodic {0} job'.format(name),
enabled=True,
args=args,
kwargs=kwargs,
crontab=PeriodicTask.Crontab.parse(cron),
)

msg = 'Scheduled {label} with the following crontab: {cron}'
log.info(msg.format(label=label, cron=task.schedule_display))


@grp.command()
@click.argument('name', metavar='<name>')
@click.argument('params', nargs=-1, metavar='<arg key=value ...>')
def unschedule(name, params):
'''
Unschedule the job <name> with the given parameters.
Jobs args and kwargs are given as parameters without dashes.
Ex:
udata job unschedule my-job arg1 arg2 key1=value key2=value
'''
if name not in celery.tasks:
exit_with_error('Job %s not found', name)

args = [p for p in params if '=' not in p]
kwargs = dict(p.split('=') for p in params if '=' in p)
label = 'Job {0}'.format(job_label(name, args, kwargs))

try:
task = PeriodicTask.objects.get(task=name, args=args, kwargs=kwargs)
except PeriodicTask.DoesNotExist:
exit_with_error('No scheduled job match {0}'.format(label))

task.delete()
msg = 'Unscheduled {label} with the following crontab: {cron}'
log.info(msg.format(label=label, cron=task.schedule_display))


@grp.command()
def scheduled():
'''
List scheduled jobs.
'''
for job in sorted(schedulables()):
for task in PeriodicTask.objects(task=job.name):
label = job_label(task.task, task.args, task.kwargs)
echo(SCHEDULE_LINE.format(
name=white(task.name.encode('utf8')),
label=label,
schedule=task.schedule_display
).encode('utf8'))
11 changes: 11 additions & 0 deletions udata/core/jobs/models.py
Expand Up @@ -25,6 +25,17 @@ class Crontab(BasePeriodicTask.Crontab):
def __unicode__(self):
return CRON.format(**self._data)

@classmethod
def parse(cls, cron):
m, h, d, M, W = cron.split()
return cls(
minute=m,
hour=h,
day_of_month=d,
month_of_year=M,
day_of_week=W,
)

@property
def schedule_display(self):
if self.interval:
Expand Down
5 changes: 0 additions & 5 deletions udata/harvest/commands.py
Expand Up @@ -89,11 +89,6 @@ def backends():
log.info('%s (%s)', backend.name, backend.display_name or backend.name)


@grp.command()
def jobs():
'''List started harvest jobs'''


@grp.command()
@click.argument('identifier')
def launch(identifier):
Expand Down
4 changes: 2 additions & 2 deletions udata/tests/cli/test_db_cli.py
Expand Up @@ -47,12 +47,12 @@ def test_unrecord_with_single_parameter_without_extension(cli, migrations):

def test_unrecord_without_parameters(cli, migrations):
'''Should display help without errors'''
result = cli('db unrecord')
result = cli('db unrecord', check=False)
assert result.exit_code != 0
assert migrations.count() == 1

def test_unrecord_with_too_many_parameters(cli, migrations):
'''Should display help without errors'''
result = cli('db unrecord udata test.js too many')
result = cli('db unrecord udata test.js too many', check=False)
assert result.exit_code != 0
assert migrations.count() == 1
6 changes: 6 additions & 0 deletions udata/tests/helpers.py
Expand Up @@ -198,3 +198,9 @@ def full_url(*args, **kwargs):
def data_path(filename):
'''Get a test data path'''
return os.path.join(os.path.dirname(__file__), 'data', filename)


def assert_command_ok(result):
__tracebackhide__ = True
msg = 'Command failed with exit code {0.exit_code} and output:\n{0.output}'
assert result.exit_code == 0, msg.format(result)
12 changes: 8 additions & 4 deletions udata/tests/plugin.py
Expand Up @@ -2,6 +2,7 @@
from __future__ import unicode_literals

import pytest
import shlex
import sys

from contextlib import contextmanager
Expand All @@ -18,7 +19,7 @@
from udata.models import db
from udata.search import es

from .helpers import assert200
from .helpers import assert200, assert_command_ok


class TestClient(FlaskClient):
Expand Down Expand Up @@ -223,16 +224,19 @@ def autoindex(app, clean_db):
@pytest.fixture(name='cli')
def cli_fixture(mocker, app):

def mock_runner(*args):
def mock_runner(*args, **kwargs):
from click.testing import CliRunner
from udata.commands import cli

if len(args) == 1 and ' ' in args[0]:
args = args[0].split()
args = shlex.split(args[0])
runner = CliRunner()
# Avoid instanciating another app and reuse the app fixture
with mocker.patch.object(cli, 'create_app', return_value=app):
return runner.invoke(cli, args, catch_exceptions=False)
result = runner.invoke(cli, args, catch_exceptions=False)
if kwargs.get('check', True):
assert_command_ok(result)
return result

return mock_runner

Expand Down

0 comments on commit 4ae3df1

Please sign in to comment.