Skip to content

Commit

Permalink
Splitted default queue to import/export, fixed django-rq admin page (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
azhavoro committed Jan 6, 2023
1 parent 0ea14d2 commit fd7d802
Show file tree
Hide file tree
Showing 25 changed files with 451 additions and 144 deletions.
34 changes: 28 additions & 6 deletions .vscode/launch.json
Expand Up @@ -133,7 +133,7 @@
}
},
{
"name": "server: RQ - default",
"name": "server: RQ - import",
"type": "python",
"request": "launch",
"stopOnEntry": false,
Expand All @@ -142,7 +142,26 @@
"program": "${workspaceRoot}/manage.py",
"args": [
"rqworker",
"default",
"import",
"--worker-class",
"cvat.rqworker.SimpleWorker",
],
"django": true,
"cwd": "${workspaceFolder}",
"env": {},
"console": "internalConsole"
},
{
"name": "server: RQ - export",
"type": "python",
"request": "launch",
"stopOnEntry": false,
"justMyCode": false,
"python": "${command:python.interpreterPath}",
"program": "${workspaceRoot}/manage.py",
"args": [
"rqworker",
"export",
"--worker-class",
"cvat.rqworker.SimpleWorker",
],
Expand All @@ -161,14 +180,16 @@
"program": "${workspaceRoot}/manage.py",
"args": [
"rqscheduler",
"--queue",
"export"
],
"django": true,
"cwd": "${workspaceFolder}",
"env": {},
"console": "internalConsole"
},
{
"name": "server: RQ - low",
"name": "server: RQ - annotation",
"type": "python",
"request": "launch",
"justMyCode": false,
Expand All @@ -177,7 +198,7 @@
"program": "${workspaceRoot}/manage.py",
"args": [
"rqworker",
"low",
"annotation",
"--worker-class",
"cvat.rqworker.SimpleWorker",
],
Expand Down Expand Up @@ -379,8 +400,9 @@
"name": "server: debug",
"configurations": [
"server: django",
"server: RQ - default",
"server: RQ - low",
"server: RQ - import",
"server: RQ - export",
"server: RQ - annotation",
"server: RQ - webhooks",
"server: RQ - scheduler",
"server: git",
Expand Down
2 changes: 1 addition & 1 deletion components/serverless/docker-compose.serverless.yml
Expand Up @@ -28,6 +28,6 @@ services:
extra_hosts:
- "host.docker.internal:host-gateway"

cvat_worker_low:
cvat_worker_annotation:
extra_hosts:
- "host.docker.internal:host-gateway"
3 changes: 2 additions & 1 deletion cvat/apps/dataset_manager/views.py
Expand Up @@ -11,6 +11,7 @@
from datumaro.util.os_util import make_file_name
from datumaro.util import to_snake_case
from django.utils import timezone
from django.conf import settings

import cvat.apps.dataset_manager.task as task
import cvat.apps.dataset_manager.project as project
Expand Down Expand Up @@ -83,7 +84,7 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
os.replace(temp_file, output_path)

archive_ctime = osp.getctime(output_path)
scheduler = django_rq.get_scheduler()
scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value)
cleaning_job = scheduler.enqueue_in(time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
Expand Down
3 changes: 2 additions & 1 deletion cvat/apps/dataset_repo/dataset_repo.py
Expand Up @@ -15,6 +15,7 @@
import git
from django.db import transaction
from django.utils import timezone
from django.conf import settings

from cvat.apps.dataset_manager.formats.registry import format_for
from cvat.apps.dataset_manager.task import export_task
Expand Down Expand Up @@ -427,7 +428,7 @@ def get(tid, user):
response['url']['value'] = '{} [{}]'.format(db_git.url, db_git.path)
try:
rq_id = "git.push.{}".format(tid)
queue = django_rq.get_queue('default')
queue = django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value)
rq_job = queue.fetch_job(rq_id)
if rq_job is not None and (rq_job.is_queued or rq_job.is_started):
db_git.status = GitStatusChoice.SYNCING
Expand Down
7 changes: 4 additions & 3 deletions cvat/apps/dataset_repo/views.py
Expand Up @@ -5,6 +5,7 @@

from django.http import HttpResponseBadRequest, HttpResponse
from rules.contrib.views import permission_required, objectgetter
from django.conf import settings

from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
Expand Down Expand Up @@ -37,7 +38,7 @@ def decorator(view):
@_legacy_api_view()
def check_process(request, rq_id):
try:
queue = django_rq.get_queue('default')
queue = django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value)
rq_job = queue.fetch_job(rq_id)

if rq_job is not None:
Expand Down Expand Up @@ -65,7 +66,7 @@ def create(request: Request, tid):
export_format = body.get("format")
lfs = body["lfs"]
rq_id = "git.create.{}".format(tid)
queue = django_rq.get_queue("default")
queue = django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value)

queue.enqueue_call(func = CVATGit.initial_create, args = (tid, path, export_format, lfs, request.user), job_id = rq_id)
return Response({ "rq_id": rq_id })
Expand All @@ -80,7 +81,7 @@ def push_repository(request: Request, tid):
slogger.task[tid].info("push repository request")

rq_id = "git.push.{}".format(tid)
queue = django_rq.get_queue('default')
queue = django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value)
queue.enqueue_call(func = CVATGit.push, args = (tid, request.user, request.scheme, request.get_host()), job_id = rq_id)

return Response({ "rq_id": rq_id })
Expand Down
33 changes: 19 additions & 14 deletions cvat/apps/engine/backup.py
Expand Up @@ -712,7 +712,7 @@ def _create_backup(db_instance, Exporter, output_path, logger, cache_ttl):
os.replace(temp_file, output_path)

archive_ctime = os.path.getctime(output_path)
scheduler = django_rq.get_scheduler()
scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.IMPORT_DATA.value)
cleaning_job = scheduler.enqueue_in(time_delta=cache_ttl,
func=clear_export_cache,
file_path=output_path,
Expand All @@ -731,7 +731,7 @@ def _create_backup(db_instance, Exporter, output_path, logger, cache_ttl):
log_exception(logger)
raise

def export(db_instance, request):
def export(db_instance, request, queue_name):
action = request.query_params.get('action', None)
filename = request.query_params.get('filename', None)

Expand All @@ -740,13 +740,13 @@ def export(db_instance, request):
"Unexpected action specified for the request")

if isinstance(db_instance, Task):
filename_prefix = 'task'
obj_type = 'task'
logger = slogger.task[db_instance.pk]
Exporter = TaskExporter
cache_ttl = TASK_CACHE_TTL
use_target_storage_conf = request.query_params.get('use_default_location', True)
elif isinstance(db_instance, Project):
filename_prefix = 'project'
obj_type = 'project'
logger = slogger.project[db_instance.pk]
Exporter = ProjectExporter
cache_ttl = PROJECT_CACHE_TTL
Expand All @@ -762,8 +762,8 @@ def export(db_instance, request):
field_name=StorageType.TARGET
)

queue = django_rq.get_queue("default")
rq_id = "/api/{}s/{}/backup".format(filename_prefix, db_instance.pk)
queue = django_rq.get_queue(queue_name)
rq_id = f"export:{obj_type}.id{db_instance.pk}-by-{request.user}"
rq_job = queue.fetch_job(rq_id)
if rq_job:
last_project_update_time = timezone.localtime(db_instance.updated_date)
Expand All @@ -780,7 +780,7 @@ def export(db_instance, request):
timestamp = datetime.strftime(last_project_update_time,
"%Y_%m_%d_%H_%M_%S")
filename = filename or "{}_{}_backup_{}{}".format(
filename_prefix, db_instance.name, timestamp,
obj_type, db_instance.name, timestamp,
os.path.splitext(file_path)[1]).lower()

location = location_conf.get('location')
Expand Down Expand Up @@ -820,7 +820,7 @@ def export(db_instance, request):
ttl = dm.views.PROJECT_CACHE_TTL.total_seconds()
queue.enqueue_call(
func=_create_backup,
args=(db_instance, Exporter, '{}_backup.zip'.format(filename_prefix), logger, cache_ttl),
args=(db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl),
job_id=rq_id,
meta={ 'request_time': timezone.localtime() },
result_ttl=ttl, failure_ttl=ttl)
Expand All @@ -834,8 +834,7 @@ def _download_file_from_bucket(db_storage, filename, key):
with open(filename, 'wb+') as f:
f.write(data.getbuffer())

def _import(importer, request, rq_id, Serializer, file_field_name, location_conf, filename=None):
queue = django_rq.get_queue("default")
def _import(importer, request, queue, rq_id, Serializer, file_field_name, location_conf, filename=None):
rq_job = queue.fetch_job(rq_id)

if not rq_job:
Expand Down Expand Up @@ -905,11 +904,11 @@ def _import(importer, request, rq_id, Serializer, file_field_name, location_conf
def get_backup_dirname():
return settings.TMP_FILES_ROOT

def import_project(request, filename=None):
def import_project(request, queue_name, filename=None):
if 'rq_id' in request.data:
rq_id = request.data['rq_id']
else:
rq_id = "{}@/api/projects/{}/import".format(request.user, uuid.uuid4())
rq_id = f"import:project.{uuid.uuid4()}-by-{request.user}"
Serializer = ProjectFileSerializer
file_field_name = 'project_file'

Expand All @@ -918,21 +917,24 @@ def import_project(request, filename=None):
field_name=StorageType.SOURCE,
)

queue = django_rq.get_queue(queue_name)

return _import(
importer=_import_project,
request=request,
queue=queue,
rq_id=rq_id,
Serializer=Serializer,
file_field_name=file_field_name,
location_conf=location_conf,
filename=filename
)

def import_task(request, filename=None):
def import_task(request, queue_name, filename=None):
if 'rq_id' in request.data:
rq_id = request.data['rq_id']
else:
rq_id = "{}@/api/tasks/{}/import".format(request.user, uuid.uuid4())
rq_id = f"import:task.{uuid.uuid4()}-by-{request.user}"
Serializer = TaskFileSerializer
file_field_name = 'task_file'

Expand All @@ -941,9 +943,12 @@ def import_task(request, filename=None):
field_name=StorageType.SOURCE
)

queue = django_rq.get_queue(queue_name)

return _import(
importer=_import_task,
request=request,
queue=queue,
rq_id=rq_id,
Serializer=Serializer,
file_field_name=file_field_name,
Expand Down
17 changes: 13 additions & 4 deletions cvat/apps/engine/mixins.py
Expand Up @@ -246,7 +246,7 @@ def upload_finished(self, request):

class AnnotationMixin:
def export_annotations(self, request, pk, db_obj, export_func, callback, get_data=None):
format_name = request.query_params.get("format")
format_name = request.query_params.get("format", "")
action = request.query_params.get("action", "").lower()
filename = request.query_params.get("filename", "")

Expand All @@ -259,7 +259,8 @@ def export_annotations(self, request, pk, db_obj, export_func, callback, get_dat
field_name=StorageType.TARGET,
)

rq_id = "/api/{}/{}/annotations/{}".format(self._object.__class__.__name__.lower(), pk, format_name)
object_name = self._object.__class__.__name__.lower()
rq_id = f"export:annotations-for-{object_name}.id{pk}-in-{format_name.replace(' ', '_')}-format"

if format_name:
return export_func(db_instance=self._object,
Expand Down Expand Up @@ -316,13 +317,21 @@ def import_annotations(self, request, pk, db_obj, import_func, rq_func, rq_id):
class SerializeMixin:
def serialize(self, request, export_func):
db_object = self.get_object() # force to call check_object_permissions
return export_func(db_object, request)
return export_func(
db_object,
request,
queue_name=settings.CVAT_QUEUES.EXPORT_DATA.value,
)

def deserialize(self, request, import_func):
location = request.query_params.get("location", Location.LOCAL)
if location == Location.CLOUD_STORAGE:
file_name = request.query_params.get("filename", "")
return import_func(request, filename=file_name)
return import_func(
request,
queue_name=settings.CVAT_QUEUES.IMPORT_DATA.value,
filename=file_name,
)
return self.upload_data(request)


Expand Down
6 changes: 3 additions & 3 deletions cvat/apps/engine/task.py
Expand Up @@ -38,11 +38,11 @@

############################# Low Level server API

def create(tid, data):
def create(tid, data, username):
"""Schedule the task"""
q = django_rq.get_queue('default')
q = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value)
q.enqueue_call(func=_create_thread, args=(tid, data),
job_id="/api/tasks/{}".format(tid))
job_id=f"create:task.id{tid}-by-{username}")

@transaction.atomic
def rq_handler(job, exc_type, exc_value, traceback):
Expand Down

0 comments on commit fd7d802

Please sign in to comment.