Skip to content

Commit

Permalink
upgrade to celery 5.2.7 (#5168)
Browse files Browse the repository at this point in the history
  • Loading branch information
escattone committed Jun 28, 2022
1 parent 034efac commit e6491b8
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 87 deletions.
2 changes: 1 addition & 1 deletion bin/run-celery-worker.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

exec newrelic-admin run-program celery -A kitsune worker --maxtasksperchild=${CELERY_WORKER_MAX_TASKS_PER_CHILD:-25}
exec newrelic-admin run-program celery -A kitsune worker --max-tasks-per-child=${CELERY_WORKER_MAX_TASKS_PER_CHILD:-25}
4 changes: 2 additions & 2 deletions kitsune/announcements/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from django.utils.translation import ugettext as _

import bleach
from celery import task
from celery import shared_task

from kitsune.announcements.models import Announcement
from kitsune.sumo.email_utils import make_mail, safe_translation, send_messages


@task()
@shared_task
def send_group_email(announcement_id):
"""Build and send the announcement emails to a group."""
try:
Expand Down
6 changes: 3 additions & 3 deletions kitsune/customercare/tasks.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
from celery import task
from celery import shared_task
from django.contrib.auth.models import User

from kitsune.customercare.zendesk import ZendeskClient


@task
@shared_task
def update_zendesk_user(user_id: int) -> None:
user = User.objects.get(pk=user_id)
if user.profile.zendesk_id:
zendesk = ZendeskClient()
zendesk.update_user(user)


@task
@shared_task
def update_zendesk_identity(user_id: int, email: str) -> None:
user = User.objects.get(pk=user_id)
zendesk_user_id = user.profile.zendesk_id
Expand Down
4 changes: 2 additions & 2 deletions kitsune/kbadge/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from celery import task
from celery import shared_task
from django.conf import settings
from django.contrib.sites.models import Site
from django.utils.translation import pgettext
Expand All @@ -9,7 +9,7 @@
from kitsune.sumo import email_utils


@task()
@shared_task
def send_award_notification(award_id: int):
"""Sends the award notification email
Expand Down
4 changes: 2 additions & 2 deletions kitsune/messages/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from django.urls import reverse
from django.utils.translation import ugettext as _

from celery import task
from celery import shared_task

from kitsune.messages.models import InboxMessage
from kitsune.sumo.email_utils import make_mail, safe_translation, send_messages
Expand All @@ -14,7 +14,7 @@
log = logging.getLogger("k.task")


@task()
@shared_task
def email_private_message(inbox_message_id):
"""Send notification of a new private message."""
inbox_message = InboxMessage.objects.get(id=inbox_message_id)
Expand Down
8 changes: 4 additions & 4 deletions kitsune/notifications/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import requests
import simplejson
from actstream.models import Action, Follow
from celery import task
from celery import shared_task
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
from requests.exceptions import RequestException
Expand Down Expand Up @@ -81,7 +81,7 @@ def _send_simple_push(endpoint, version, max_retries=3, _retry_count=0):
logger.error("SimplePush error: %s %s", r.status_code, r.json())


@task(ignore_result=True)
@shared_task
def add_notification_for_action(action_id: int):
action = Action.objects.get(id=action_id)

Expand All @@ -98,7 +98,7 @@ def add_notification_for_action(action_id: int):
Notification.objects.create(owner=u, action=action)


@task(ignore_result=True)
@shared_task
def send_realtimes_for_action(action_id: int):
action = Action.objects.get(id=action_id)
query = _full_ct_query(action)
Expand All @@ -110,7 +110,7 @@ def send_realtimes_for_action(action_id: int):
_send_simple_push(reg.endpoint, action.id)


@task(ignore_result=True)
@shared_task
def send_notification(notification_id: int):
"""Call every notification handler for a notification."""
notification = Notification.objects.get(id=notification_id)
Expand Down
10 changes: 5 additions & 5 deletions kitsune/questions/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import date
from typing import Dict

from celery import task
from celery import shared_task
from django.conf import settings
from django.contrib.auth.models import User
from django.db import connection, transaction
Expand All @@ -16,7 +16,7 @@
log = logging.getLogger("k.task")


@task(rate_limit="1/s")
@shared_task(rate_limit="1/s")
def update_question_votes(question_id):
from kitsune.questions.models import Question

Expand All @@ -30,7 +30,7 @@ def update_question_votes(question_id):
log.info("Question id=%s deleted before task." % question_id)


@task(rate_limit="4/s")
@shared_task(rate_limit="4/s")
def update_question_vote_chunk(data):
"""Update num_votes_past_week for a number of questions."""

Expand All @@ -57,7 +57,7 @@ def update_question_vote_chunk(data):
transaction.commit()


@task(rate_limit="4/m")
@shared_task(rate_limit="4/m")
def update_answer_pages(question_id: int):
from kitsune.questions.models import Question

Expand All @@ -79,7 +79,7 @@ def update_answer_pages(question_id: int):
i += 1


@task()
@shared_task
def maybe_award_badge(badge_template: Dict, year: int, user_id: int):
"""Award the specific badge to the user if they've earned it."""
badge = get_or_create_badge(badge_template, year)
Expand Down
10 changes: 5 additions & 5 deletions kitsune/search/es7_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import inspect

from celery import task
from celery import shared_task
from django.conf import settings
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk as es7_bulk
Expand Down Expand Up @@ -119,7 +119,7 @@ def get_doc_types(paths=["kitsune.search.documents"]):
return doc_types


@task
@shared_task
def index_object(doc_type_name, obj_id):
"""Index an ORM object given an object id and a document type name."""

Expand All @@ -140,7 +140,7 @@ def index_object(doc_type_name, obj_id):
doc_type.prepare(obj).to_action("index")


@task
@shared_task
def index_objects_bulk(
doc_type_name,
obj_ids,
Expand Down Expand Up @@ -187,7 +187,7 @@ def index_objects_bulk(
raise BulkIndexError(f"{len(errors)} document(s) failed to index.", errors)


@task
@shared_task
def remove_from_field(doc_type_name, field_name, field_value):
"""Remove a value from all documents in the doc_type's index."""
doc_type = next(cls for cls in get_doc_types() if cls.__name__ == doc_type_name)
Expand All @@ -213,7 +213,7 @@ def remove_from_field(doc_type_name, field_name, field_value):
doc_type._index.refresh()


@task
@shared_task
def delete_object(doc_type_name, obj_id):
"""Unindex an ORM object given an object id and document type name."""

Expand Down
4 changes: 2 additions & 2 deletions kitsune/sumo/tasks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
from datetime import datetime

from celery import task
from celery import shared_task


log = logging.getLogger("k.task")


@task(serializer="pickle")
@shared_task(serializer="pickle")
def measure_queue_lag(queued_time):
"""A task that measures the time it was sitting in the queue."""
lag = datetime.now() - queued_time
Expand Down
4 changes: 2 additions & 2 deletions kitsune/tidings/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Sequence
from smtplib import SMTPException

from celery import task
from celery import shared_task
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
Expand Down Expand Up @@ -135,7 +135,7 @@ def fire(self, exclude=None, delay=True):
else:
self._fire_task(self, exclude=exclude)

@task
@shared_task
def _fire_task(self, exclude=None):
"""Build and send the emails as a celery task."""
connection = mail.get_connection(fail_silently=True)
Expand Down
4 changes: 2 additions & 2 deletions kitsune/tidings/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from celery import task
from celery import shared_task

from kitsune.tidings.models import Watch


@task
@shared_task
def claim_watches(user):
"""Attach any anonymous watches having a user's email to that user.
Expand Down
6 changes: 3 additions & 3 deletions kitsune/upload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import subprocess
from tempfile import NamedTemporaryFile

from celery import task
from celery import shared_task
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
Expand All @@ -13,7 +13,7 @@
log = logging.getLogger("k.task")


@task(rate_limit="15/m", serializer="pickle")
@shared_task(rate_limit="15/m", serializer="pickle")
def generate_thumbnail(for_obj, from_field, to_field, max_size=settings.THUMBNAIL_SIZE):
"""Generate a thumbnail, given a model instance with from and to fields.
Expand Down Expand Up @@ -100,7 +100,7 @@ def _scale_dimensions(width, height, longest_side=settings.THUMBNAIL_SIZE):
return (new_width, new_height)


@task(rate_limit="15/m", serializer="pickle")
@shared_task(rate_limit="15/m", serializer="pickle")
def compress_image(for_obj, for_field):
"""Compress an image of given field for given object."""

Expand Down
10 changes: 5 additions & 5 deletions kitsune/users/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import json
from datetime import datetime

from celery import task
from celery import shared_task

from kitsune.products.models import Product
from kitsune.users.auth import FXAAuthBackend
from kitsune.users.models import AccountEvent
from kitsune.users.utils import anonymize_user


@task
@shared_task
def process_event_delete_user(event_id):
event = AccountEvent.objects.get(id=event_id)

Expand All @@ -19,7 +19,7 @@ def process_event_delete_user(event_id):
event.save()


@task
@shared_task
def process_event_subscription_state_change(event_id):
event = AccountEvent.objects.get(id=event_id)
body = json.loads(event.body)
Expand All @@ -45,7 +45,7 @@ def process_event_subscription_state_change(event_id):
event.save()


@task
@shared_task
def process_event_password_change(event_id):
event = AccountEvent.objects.get(id=event_id)
body = json.loads(event.body)
Expand All @@ -63,7 +63,7 @@ def process_event_password_change(event_id):
event.save()


@task
@shared_task
def process_event_profile_change(event_id):
event = AccountEvent.objects.get(id=event_id)
refresh_token = event.profile.fxa_refresh_token
Expand Down
16 changes: 8 additions & 8 deletions kitsune/wiki/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Dict, List

import waffle
from celery import task
from celery import shared_task
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.sites.models import Site
Expand Down Expand Up @@ -32,7 +32,7 @@
log = logging.getLogger("k.task")


@task()
@shared_task
def send_reviewed_notification(revision_id: int, document_id: int, message: str):
"""Send notification of review to the revision creator."""

Expand Down Expand Up @@ -92,7 +92,7 @@ def _make_mail(locale, user):
email_utils.send_messages(msgs)


@task()
@shared_task
def send_contributor_notification(
based_on_ids: List[int], revision_id: int, document_id: int, message: str
):
Expand Down Expand Up @@ -172,7 +172,7 @@ def schedule_rebuild_kb():
rebuild_kb.delay()


@task
@shared_task
def add_short_links(doc_ids):
"""Create short_url's for a list of docs."""
base_url = "https://{0}%s".format(Site.objects.get_current().domain)
Expand All @@ -188,7 +188,7 @@ def add_short_links(doc_ids):
pass


@task(rate_limit="3/h")
@shared_task(rate_limit="3/h")
def rebuild_kb():
"""Re-render all documents in the KB in chunks."""
cache.delete(settings.WIKI_REBUILD_TOKEN)
Expand All @@ -203,7 +203,7 @@ def rebuild_kb():
_rebuild_kb_chunk.apply_async(args=[chunk])


@task(rate_limit="5/m")
@shared_task(rate_limit="5/m")
def _rebuild_kb_chunk(data):
"""Re-render a chunk of documents.
Expand Down Expand Up @@ -254,7 +254,7 @@ def _rebuild_kb_chunk(data):
transaction.commit()


@task()
@shared_task
def maybe_award_badge(badge_template: Dict, year: int, user_id: int):
"""Award the specific badge to the user if they've earned it."""
try:
Expand Down Expand Up @@ -289,7 +289,7 @@ def maybe_award_badge(badge_template: Dict, year: int, user_id: int):
return True


@task()
@shared_task
def render_document_cascade(base_doc_id):
"""Given a document, render it and all documents that may be affected."""

Expand Down
Loading

0 comments on commit e6491b8

Please sign in to comment.