Skip to content

Commit

Permalink
perf(celery-task): 优化检查节点资产数量的 Celery 任务 (#5052)
Browse files Browse the repository at this point in the history
Co-authored-by: xinwen <coderWen@126.com>
  • Loading branch information
fit2bot and xuxinwen committed Nov 20, 2020
1 parent a626ff5 commit b8f175e
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 23 deletions.
7 changes: 3 additions & 4 deletions apps/assets/tasks/nodes_amount.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from celery import shared_task

from ops.celery.decorator import register_as_period_task
from assets.utils import check_node_assets_amount
from common.utils import get_logger
from common.utils.timezone import now

logger = get_logger(__file__)


@shared_task()
@register_as_period_task(crontab='* 2 * * *')
@shared_task(queue='celery_heavy_tasks')
def check_node_assets_amount_celery_task():
logger.info(f'>>> {now()} begin check_node_assets_amount_celery_task ...')
check_node_assets_amount()
logger.info(f'>>> {now()} end check_node_assets_amount_celery_task ...')
9 changes: 7 additions & 2 deletions apps/assets/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# ~*~ coding: utf-8 ~*~
#
import time

from django.db.models import Q

from common.utils import get_logger, dict_get_any, is_uuid, get_object_or_none
Expand All @@ -12,15 +14,18 @@

def check_node_assets_amount():
for node in Node.objects.all():
logger.info(f'Check node assets amount: {node}')
assets_amount = Asset.objects.filter(
Q(nodes__key__istartswith=f'{node.key}:') | Q(nodes=node)
).distinct().count()

if node.assets_amount != assets_amount:
print(f'>>> <Node:{node.key}> wrong assets amount '
f'{node.assets_amount} right is {assets_amount}')
logger.warn(f'Node wrong assets amount <Node:{node.key}> '
f'{node.assets_amount} right is {assets_amount}')
node.assets_amount = assets_amount
node.save()
# 防止自检程序给数据库的压力太大
time.sleep(0.1)


def is_asset_exists_in_node(asset_pk, node_key):
Expand Down
13 changes: 0 additions & 13 deletions apps/ops/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,3 @@
app.namespace = 'CELERY'
app.conf.update(configs)
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])

app.conf.beat_schedule = {
'check-asset-permission-expired': {
'task': 'perms.tasks.check_asset_permission_expired',
'schedule': settings.PERM_EXPIRED_CHECK_PERIODIC,
'args': ()
},
'check-node-assets-amount': {
'task': 'assets.tasks.nodes_amount.check_node_assets_amount_celery_task',
'schedule': crontab(minute=0, hour=0),
'args': ()
},
}
5 changes: 4 additions & 1 deletion apps/perms/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from django.db import transaction
from django.db.models import Q
from django.db.transaction import atomic
from django.conf import settings
from celery import shared_task
from common.utils import get_logger
from common.utils.timezone import now, dt_formater, dt_parser
from users.models import User
from ops.celery.decorator import register_as_period_task
from assets.models import Node
from perms.models import RebuildUserTreeTask, AssetPermission
from perms.utils.asset.user_permission import rebuild_user_mapping_nodes_if_need_with_lock, lock
Expand All @@ -33,7 +35,8 @@ def dispatch_mapping_node_tasks():
rebuild_user_mapping_nodes_celery_task.delay(id)


@shared_task(queue='check_asset_perm_expired')
@register_as_period_task(interval=settings.PERM_EXPIRED_CHECK_PERIODIC)
@shared_task(queue='celery_check_asset_perm_expired')
@atomic()
def check_asset_permission_expired():
"""
Expand Down
15 changes: 12 additions & 3 deletions jms
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ def is_running(s, unlink=True):

def parse_service(s):
web_services = ['gunicorn', 'flower', 'daphne']
celery_services = ["celery_ansible", "celery_default", "celery_node_tree", "check_asset_perm_expired"]
celery_services = [
"celery_ansible", "celery_default", "celery_node_tree",
"celery_check_asset_perm_expired", "celery_heavy_tasks"
]
task_services = celery_services + ['beat']
all_services = web_services + task_services
if s == 'all':
Expand Down Expand Up @@ -225,9 +228,14 @@ def get_start_celery_node_tree_kwargs():
return get_start_worker_kwargs('node_tree', 2)


def get_start_celery_heavy_tasks_kwargs():
print("\n- Start Celery as Distributed Task Queue: HeavyTasks")
return get_start_worker_kwargs('celery_heavy_tasks', 1)


def get_start_celery_check_asset_perm_expired_kwargs():
print("\n- Start Celery as Distributed Task Queue: CheckAseetPermissionExpired")
return get_start_worker_kwargs('check_asset_perm_expired', 1)
return get_start_worker_kwargs('celery_check_asset_perm_expired', 1)


def get_start_worker_kwargs(queue, num):
Expand Down Expand Up @@ -366,7 +374,8 @@ def start_service(s):
"celery_ansible": get_start_celery_ansible_kwargs,
"celery_default": get_start_celery_default_kwargs,
"celery_node_tree": get_start_celery_node_tree_kwargs,
"check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs,
"celery_heavy_tasks": get_start_celery_heavy_tasks_kwargs,
"celery_check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs,
"beat": get_start_beat_kwargs,
"flower": get_start_flower_kwargs,
"daphne": get_start_daphne_kwargs,
Expand Down

0 comments on commit b8f175e

Please sign in to comment.