Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Use Spinach task scheduler instead of Celery. (#2381)
Co-Authored-By: William Lachance <wlachance@mozilla.com>
  • Loading branch information
jezdez and wlach committed Oct 29, 2019
1 parent 5a76520 commit 1891a59
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 69 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Expand Up @@ -84,6 +84,3 @@ wallaby.js

# mkdocs
site

# celery
celerybeat-schedule
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,5 +1,6 @@
# (Unreleased; add upcoming change notes here)

- Use Spinach task scheduler instead of Celery: simpler, and eliminates the need for a worker beat node (#2381)
- Fix reps bug involving proxies (#2254, #2237, #1748)

# 0.14.0 (2019-10-24)
Expand Down
4 changes: 1 addition & 3 deletions Procfile
@@ -1,5 +1,3 @@
web: gunicorn server.wsgi
worker_beat: celery beat -A server
worker: celery -A server worker

worker: python manage.py spinach
release: ./bin/pre_deploy
7 changes: 1 addition & 6 deletions bin/run
Expand Up @@ -5,8 +5,6 @@ set -eo pipefail
: "${PORT:=8000}"
: "${SLEEP:=1}"
: "${TRIES:=60}"
: "${MONITOR_PIDFILE:=/app/celerymonitor.pid}"
: "${SCHEDULER_PIDFILE:=/app/celerybeat.pid}"

usage() {
echo "usage: bin/run dev"
Expand Down Expand Up @@ -40,10 +38,7 @@ case $1 in
exec python manage.py runserver 0.0.0.0:${PORT}
;;
worker)
celery -A server worker -l debug --events
;;
beat)
celery -A server beat --pidfile ${SCHEDULER_PIDFILE} -l debug
exec python manage.py spinach
;;
tests)
shift
Expand Down
4 changes: 0 additions & 4 deletions docker-compose.yml
Expand Up @@ -23,10 +23,6 @@ services:
<<: *app
command: worker

beat:
<<: *app
command: beat

db:
image: postgres:9.5-alpine
logging:
Expand Down
2 changes: 1 addition & 1 deletion pytest.ini
@@ -1,6 +1,6 @@
[pytest]
norecursedirs = .git .* static __pycache__ build
DJANGO_SETTINGS_MODULE = server.settings
DJANGO_SETTINGS_MODULE = server.tests.settings
python_files = tests.py test_*.py *_tests.py
testpaths = server
addopts = -rsxX --showlocals --tb=native --nomigrations --flake8 --staticfiles --isort --cov-report term --cov-report xml --cov server
Expand Down
17 changes: 5 additions & 12 deletions requirements/build.txt
Expand Up @@ -29,15 +29,6 @@ Brotli==1.0.7 \
--hash=sha256:fc7212e36ebeb81aebf7949c92897b622490d7c0e333a479c0395591e7994600
PyJWT==1.7.1 \
--hash=sha256:5c6eca3c2940464d106b99ba83b00c6add741c9becaec087fb7ccdefea71350e
amqp==2.5.1 \
--hash=sha256:19a917e260178b8d410122712bac69cb3e6db010d68f6101e7307508aded5e68 \
--hash=sha256:19d851b879a471fcfdcf01df9936cff924f422baa77653289f7095dedd5fb26a
billiard==3.6.1.0 \
--hash=sha256:01afcb4e7c4fd6480940cfbd4d9edc19d7a7509d6ada533984d0d0f49901ec82 \
--hash=sha256:b8809c74f648dfe69b973c8e660bcec00603758c9db8ba89d7719f88d5f01f26
celery==4.4.0rc3 \
--hash=sha256:821d11967f0f3f8fe24bd61ecfc7b6acbb5a926b719f1e8c4d5ff7bc09e18d81 \
--hash=sha256:ae4541fb3af5182bd4af749fee9b89c4858f2792d34bb5d034967e662cf9b55c
certifi==2019.9.11 \
--hash=sha256:fd7c7c74727ddcf00e9acd26bba8da604ffec95bf1c2144e67aff7a8b50e6cef \
--hash=sha256:e4f3620cfea4f83eedc95b24abd9cd56f3c4b146dd0177e83a21b4eb49e21e50
Expand Down Expand Up @@ -73,9 +64,6 @@ idna==2.8 \
importlib-metadata==0.23 \
--hash=sha256:d5f18a79777f3aa179c145737780282e27b508fc8fd688cb17c7a813e8bd39af \
--hash=sha256:aa18d7378b00b40847790e7c27e11673d7fed219354109d0e7b9e5b25dc3ad26
kombu==4.6.5 \
--hash=sha256:31edb84947996fdda065b6560c128d5673bb913ff34aa19e7b84755217a24deb \
--hash=sha256:c9078124ce2616b29cf6607f0ac3db894c59154252dee6392cdbbe15e5c4b566
more-itertools==7.2.0 \
--hash=sha256:92b8c4b06dac4f0611c0729b2f2ede52b2e1bac1ab48f089c7ddc12e26bb60c4 \
--hash=sha256:409cd48d4db7052af495b09dec721011634af3753ae1ef92d2b32f73a745f832
Expand Down Expand Up @@ -170,3 +158,8 @@ whitenoise==4.1.4 \
zipp==0.6.0 \
--hash=sha256:f06903e9f1f43b12d371004b4ac7b06ab39a44adc747266928ae6debfa7b3335 \
--hash=sha256:3718b1cbcd963c7d4c5511a8240812904164b7f381b647143a89d3b98f9bcd8e
spinach==0.0.11 \
--hash=sha256:20e6b802cd2c7855173009aabbcf5d2a48bdbf93c2a13d3e78a777b5300b128f \
--hash=sha256:d1f639d91773069e6dcf588e208027ba44e6f9f26a3dafd3f2385a6911101a40
blinker==1.4 \
--hash=sha256:471aee25f3992bd325afa3772f1063dbdbbca947a041b8b89466dc00d606f8b6
19 changes: 0 additions & 19 deletions server/celery.py

This file was deleted.

4 changes: 2 additions & 2 deletions server/files/api_views.py
Expand Up @@ -15,7 +15,7 @@
FilesSerializer,
FileUpdateOperationSerializer,
)
from .tasks import execute_file_update_operation
from .tasks import execute_file_update_operation, tasks


class FileViewSet(viewsets.ModelViewSet):
Expand Down Expand Up @@ -147,6 +147,6 @@ def create(self, serializer):
raise PermissionDenied

update_operation = FileUpdateOperation.objects.create(file_source=file_source)
execute_file_update_operation.apply_async(args=[update_operation.id])
tasks.schedule(execute_file_update_operation, update_operation.id)

return Response(FileUpdateOperationSerializer(update_operation).data, status=201)
14 changes: 9 additions & 5 deletions server/files/tasks.py
Expand Up @@ -4,14 +4,18 @@
import requests
from django.conf import settings
from django.utils import timezone
from spinach import Tasks

from ..celery import celery
from .models import File, FileSource, FileUpdateOperation

logger = logging.getLogger(__name__)

tasks = Tasks()

@celery.task
ONE_DAY = datetime.timedelta(days=1)


@tasks.task(name="files:execute_file_update_operation")
def execute_file_update_operation(update_operation_id):
update_operation = FileUpdateOperation.objects.get(id=update_operation_id)

Expand Down Expand Up @@ -46,10 +50,10 @@ def execute_file_update_operation(update_operation_id):
update_operation.save()


@celery.task
@tasks.task(name="files:execute_scheduled_file_operations", periodicity=ONE_DAY)
def execute_scheduled_file_operations():
# this runs once a day, always queue daily operations
intervals_to_queue = [datetime.timedelta(days=1)]
intervals_to_queue = [ONE_DAY]
# if it is monday, include the weekly ones as well
if datetime.datetime.today().weekday() == 0:
intervals_to_queue.append(datetime.timedelta(weeks=1))
Expand All @@ -61,4 +65,4 @@ def execute_scheduled_file_operations():
file_sources = FileSource.objects.filter(update_interval__in=intervals_to_queue)
for file_source in file_sources:
update_operation = FileUpdateOperation.objects.create(file_source=file_source)
execute_file_update_operation.apply_async(args=[update_operation.id])
tasks.schedule(execute_file_update_operation, update_operation.id)
22 changes: 13 additions & 9 deletions server/settings.py
Expand Up @@ -14,8 +14,10 @@
import re

import environ
from celery.schedules import crontab
import redis
from django.utils.log import DEFAULT_LOGGING
from furl import furl
from spinach.brokers.redis import RedisBroker, recommended_socket_opts

env = environ.Env()

Expand Down Expand Up @@ -97,6 +99,7 @@
"server.jwt",
"server.notebooks",
"server.files",
"spinach.contrib.spinachd",
]

RESTRICT_API = env.bool("RESTRICT_API", default=False)
Expand Down Expand Up @@ -164,6 +167,14 @@
}
]

LOGGING = DEFAULT_LOGGING.copy()
LOGGING["loggers"].update(
{
"spinach": {"handlers": ["console"], "level": "INFO"},
"server": {"handlers": ["console"], "level": "INFO"},
}
)

# When DEBUG is True, allow HTTP traffic, otherwise, never allow HTTP traffic.
SECURE_SSL_REDIRECT = env.bool("SECURE_SSL_REDIRECT", default=not DEBUG)
SECURE_HSTS_SECONDS = env.int("SECURE_HSTS_SECONDS", default="31536000")
Expand Down Expand Up @@ -237,11 +248,4 @@

REDIS_HOST = env.str("REDIS_HOST", default="redis")
REDIS_URL = env.str("REDIS_URL", default=f"redis://{REDIS_HOST}:6379/1")
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_BEAT_SCHEDULE = {
"run_scheduled_file_operations": {
"task": "server.files.tasks.execute_scheduled_file_operations",
"schedule": crontab(minute=0, hour=0),
}
}
SPINACH_BROKER = RedisBroker(redis.from_url(REDIS_URL, **recommended_socket_opts))
5 changes: 5 additions & 0 deletions server/tests/settings.py
@@ -0,0 +1,5 @@
from spinach import MemoryBroker

from server.settings import * # noqa

SPINACH_BROKER = MemoryBroker()
15 changes: 10 additions & 5 deletions server/tests/test_file_update_operations.py
Expand Up @@ -113,7 +113,7 @@ def test_run_scheduled_file_operations(fake_user, test_notebook, date):
daily_ids = set([file_sources[0].id])

with freeze_time(date):
with patch("server.files.tasks.execute_file_update_operation.apply_async") as mock_task:
with patch("server.files.tasks.tasks.schedule") as mock_schedule:
execute_scheduled_file_operations()
update_operations = FileUpdateOperation.objects.all()
if date == "2019-07-08":
Expand All @@ -123,14 +123,17 @@ def test_run_scheduled_file_operations(fake_user, test_notebook, date):
# only the daily task should have run
assert set(update_operations.values_list("file_source_id", flat=True)) == daily_ids
# also make sure we queued the relevant async tasks
mock_task.assert_has_calls(
[call(args=[id]) for id in update_operations.values_list("id", flat=True)]
mock_schedule.assert_has_calls(
[
call(execute_file_update_operation, id)
for id in update_operations.values_list("id", flat=True)
]
)


@pytest.mark.freeze_time("2017-05-21")
def test_post_file_update_operation(fake_user, test_notebook, test_file_source, client):
with patch("server.files.tasks.execute_file_update_operation.apply_async") as mock_task:
with patch("server.files.tasks.tasks.schedule") as mock_schedule:
client.force_login(user=fake_user)
resp = client.post(
reverse("file-update-operations-list"), {"file_source_id": test_file_source.id}
Expand All @@ -147,4 +150,6 @@ def test_post_file_update_operation(fake_user, test_notebook, test_file_source,
assert file_update_operation.ended_at is None

# verify that the expected task has been queued
mock_task.assert_has_calls([call(args=[file_update_operation.id])])
mock_schedule.assert_has_calls(
[call(execute_file_update_operation, file_update_operation.id)]
)

0 comments on commit 1891a59

Please sign in to comment.