diff --git a/backend/core/settings.py b/backend/core/settings.py index e3c9b0e7..37f41075 100644 --- a/backend/core/settings.py +++ b/backend/core/settings.py @@ -52,6 +52,7 @@ "tastypie", "corsheaders", "dqt", + "processor.apps.ProcessorConfig", "exporter.apps.ExporterConfig", ] diff --git a/backend/core/urls.py b/backend/core/urls.py index 4df91858..8129f8bf 100644 --- a/backend/core/urls.py +++ b/backend/core/urls.py @@ -2,5 +2,6 @@ urlpatterns = [ path("", include("dqt.urls"), name="api"), + path("", include("processor.urls"), name="processor"), path("", include("exporter.urls"), name="exporter"), ] diff --git a/backend/dqt/urls.py b/backend/dqt/urls.py index 14c3c9a6..dea73085 100644 --- a/backend/dqt/urls.py +++ b/backend/dqt/urls.py @@ -12,17 +12,10 @@ TimeVarianceLevelCheckResource, ) from .views import ( - create_dataset_filter, - dataset_availability, dataset_distinct_values, dataset_filter_items, - dataset_id, dataset_level_stats, - dataset_metadata, - dataset_progress, - dataset_start, dataset_stats, - dataset_wipe, field_level_detail, field_level_stats, resource_level_detail, @@ -47,7 +40,6 @@ path("api/time_variance_level_stats/", time_variance_level_stats, name="time_variance_level_stats"), path("api/field_level_detail//", field_level_detail, name="field_level_detail"), path("api/resource_level_detail//", resource_level_detail, name="resource_level_detail"), - path("api/create_dataset_filter", create_dataset_filter, name="create_dataset_filter"), path("api/dataset_filter_items", dataset_filter_items, name="dataset_filter_items"), path( "api/dataset_distinct_values//", dataset_distinct_values, name="dataset_distinct_values" @@ -57,12 +49,6 @@ dataset_distinct_values, name="dataset_distinct_values", ), - path("api/dataset_start", dataset_start, name="dataset_start"), - path("api/dataset_status/", dataset_progress, name="dataset_status"), - path("api/dataset_id", dataset_id, name="dataset_id"), - path("api/dataset_availability/", dataset_availability, name="dataset_availability"), - path("api/dataset_metadata/", dataset_metadata, name="dataset_metadata"), - path("api/dataset_wipe", dataset_wipe, name="dataset_wipe"), re_path(r"^api/", include(dataset_resource.urls)), re_path(r"^api/", include(data_item_resource.urls)), re_path(r"^api/", include(progress_monitor_dataset_resource.urls)), diff --git a/backend/dqt/views.py b/backend/dqt/views.py index 8c201636..3f17dc51 100644 --- a/backend/dqt/views.py +++ b/backend/dqt/views.py @@ -3,29 +3,11 @@ import simplejson as json from django.db import connections from django.db.models import Count -from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse +from django.http import HttpResponseBadRequest, JsonResponse from django.views.decorators.csrf import csrf_exempt -from psycopg2.sql import SQL, Identifier +from psycopg2.sql import SQL -from .models import ( - DataItem, - Dataset, - DatasetLevelCheck, - FieldLevelCheck, - ProgressMonitorDataset, - TimeVarianceLevelCheck, -) -from .tools.rabbit import publish - - -@csrf_exempt -def create_dataset_filter(request): - if request.method == "GET": - return HttpResponseBadRequest(reason="Only post method is accepted.") - - publish(request.body, "_dataset_filter_extractor_init") - - return HttpResponse("done") +from .models import DataItem, Dataset, DatasetLevelCheck, TimeVarianceLevelCheck @csrf_exempt @@ -263,135 +245,3 @@ def time_variance_level_stats(request, dataset_id): "meta": check.meta, } return JsonResponse(result) - - -@csrf_exempt -def dataset_start(request): - if request.method == "GET": - return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}}) - - routing_key = "_ocds_kingfisher_extractor_init" - - body = json.loads(request.body.decode("utf-8")) - - dataset_name = body.get("name") - - message = { - "name": dataset_name, - "collection_id": body.get("collection_id"), - # "ancestor_id": ancestor_id, - # "max_items": max_items, - } - - publish(json.dumps(message), routing_key) - - return JsonResponse( - {"status": "ok", "data": {"message": f"Dataset {dataset_name} on Pelican started"}}, safe=False - ) - - -@csrf_exempt -def dataset_wipe(request): - if request.method == "GET": - return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}}) - - routing_key = "_wiper_init" - - body = json.loads(request.body.decode("utf-8")) - - message = { - "dataset_id": body.get("dataset_id"), - } - - publish(json.dumps(message), routing_key) - - return JsonResponse( - {"status": "ok", "data": {"message": f"Dataset id {body.get('dataset_id')} on Pelican will be wiped"}}, - safe=False, - ) - - -@csrf_exempt -def dataset_progress(request, dataset_id): - try: - monitor = ProgressMonitorDataset.objects.values("state", "phase").get(dataset__id=dataset_id) - return JsonResponse({"status": "ok", "data": monitor}, safe=False) - except ProgressMonitorDataset.DoesNotExist: - return JsonResponse({"status": "ok", "data": None}, safe=False) - - -@csrf_exempt -def dataset_id(request): - if request.method == "GET": - return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}}) - - body = json.loads(request.body.decode("utf-8")) - dataset_name = body.get("name") - - dataset = Dataset.objects.get(name=dataset_name) - return JsonResponse({"status": "ok", "data": dataset.id if dataset else None}, safe=False) - - -@csrf_exempt -def dataset_availability(request, dataset_id): - map = { - "parties": ["parties.id"], - "plannings": ["planning.budget"], - "tenders": ["tender.id"], - "tenderers": ["tenderers.id"], - "tenders_items": ["tender.items.id"], - "awards": ["awards.id"], - "awards_items": ["awards.items.id"], - "awards_suppliers": ["awards.suppliers.id"], - "contracts": ["contracts.id"], - "contracts_items": ["contracts.items.id"], - "contracts_transactions": ["contracts.implementation.transactions.id"], - "documents": [ - "planning.documents.id", - "tender.documents.id", - "awards.documents.id", - "contracts.documents.id", - "contracts.implementation.documents.id", - ], - "milestones": [ - "planning.milestones.id", - "tender.milestones.id", - "contracts.milestones.id", - "contracts.implementation.milestones.id", - ], - "amendments": ["tender.amendments.id", "awards.amendments.id", "contract.amendments.id"], - } - - with connections["data"].cursor() as cursor: - statement = """ - SELECT c.key AS check, SUM(jsonb_array_length(c.value)) AS count - FROM {table} flc, jsonb_each(flc.result->'checks') c - WHERE dataset_id = %(dataset_id)s - AND c.key IN %(checks)s - GROUP BY c.key - ORDER BY c.key - """ - - cursor.execute( - SQL(statement).format(table=Identifier(FieldLevelCheck._meta.db_table)), - {"checks": tuple(j for i in map.values() for j in i), "dataset_id": dataset_id}, - ) - - results = cursor.fetchall() - - counts = {} - for key, items in map.items(): - counts[key] = 0 - for i in items: - for r in results: - if r[0] == i: - counts[key] += int(r[1]) - - return JsonResponse({"status": "ok", "data": counts}, safe=False) - - -@csrf_exempt -def dataset_metadata(request, dataset_id): - meta = Dataset.objects.values_list("meta__collection_metadata", flat=True).get(id=dataset_id) - - return JsonResponse({"status": "ok", "data": meta}, safe=False) diff --git a/backend/dqt/tools/__init__.py b/backend/processor/__init__.py similarity index 100% rename from backend/dqt/tools/__init__.py rename to backend/processor/__init__.py diff --git a/backend/processor/apps.py b/backend/processor/apps.py new file mode 100644 index 00000000..a8c8552c --- /dev/null +++ b/backend/processor/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class ProcessorConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "processor" diff --git a/backend/dqt/tools/rabbit.py b/backend/processor/rabbitmq.py similarity index 100% rename from backend/dqt/tools/rabbit.py rename to backend/processor/rabbitmq.py diff --git a/backend/processor/urls.py b/backend/processor/urls.py new file mode 100644 index 00000000..dcc7c53e --- /dev/null +++ b/backend/processor/urls.py @@ -0,0 +1,21 @@ +from django.urls import path + +from .views import ( + create_dataset_filter, + dataset_availability, + dataset_id, + dataset_metadata, + dataset_progress, + dataset_start, + dataset_wipe, +) + +urlpatterns = [ + path("api/create_dataset_filter", create_dataset_filter, name="create_dataset_filter"), + path("api/dataset_status/", dataset_progress, name="dataset_status"), + path("api/dataset_id", dataset_id, name="dataset_id"), + path("api/dataset_availability/", dataset_availability, name="dataset_availability"), + path("api/dataset_metadata/", dataset_metadata, name="dataset_metadata"), + path("api/dataset_start", dataset_start, name="dataset_start"), + path("api/dataset_wipe", dataset_wipe, name="dataset_wipe"), +] diff --git a/backend/processor/views.py b/backend/processor/views.py new file mode 100644 index 00000000..b66a8a06 --- /dev/null +++ b/backend/processor/views.py @@ -0,0 +1,150 @@ +import simplejson as json +from django.db import connections +from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse +from django.views.decorators.csrf import csrf_exempt +from dqt.models import Dataset, FieldLevelCheck, ProgressMonitorDataset +from psycopg2.sql import SQL, Identifier + +from .rabbitmq import publish + + +@csrf_exempt +def create_dataset_filter(request): + if request.method == "GET": + return HttpResponseBadRequest(reason="Only post method is accepted.") + + publish(request.body, "_dataset_filter_extractor_init") + + return HttpResponse("done") + + +@csrf_exempt +def dataset_start(request): + if request.method == "GET": + return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}}) + + routing_key = "_ocds_kingfisher_extractor_init" + + body = json.loads(request.body.decode("utf-8")) + + dataset_name = body.get("name") + + message = { + "name": dataset_name, + "collection_id": body.get("collection_id"), + # "ancestor_id": ancestor_id, + # "max_items": max_items, + } + + publish(json.dumps(message), routing_key) + + return JsonResponse( + {"status": "ok", "data": {"message": f"Dataset {dataset_name} on Pelican started"}}, safe=False + ) + + +@csrf_exempt +def dataset_wipe(request): + if request.method == "GET": + return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}}) + + routing_key = "_wiper_init" + + body = json.loads(request.body.decode("utf-8")) + + message = { + "dataset_id": body.get("dataset_id"), + } + + publish(json.dumps(message), routing_key) + + return JsonResponse( + {"status": "ok", "data": {"message": f"Dataset id {body.get('dataset_id')} on Pelican will be wiped"}}, + safe=False, + ) + + +@csrf_exempt +def dataset_progress(request, dataset_id): + try: + monitor = ProgressMonitorDataset.objects.values("state", "phase").get(dataset__id=dataset_id) + return JsonResponse({"status": "ok", "data": monitor}, safe=False) + except ProgressMonitorDataset.DoesNotExist: + return JsonResponse({"status": "ok", "data": None}, safe=False) + + +@csrf_exempt +def dataset_id(request): + if request.method == "GET": + return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}}) + + body = json.loads(request.body.decode("utf-8")) + dataset_name = body.get("name") + + dataset = Dataset.objects.get(name=dataset_name) + return JsonResponse({"status": "ok", "data": dataset.id if dataset else None}, safe=False) + + +@csrf_exempt +def dataset_availability(request, dataset_id): + map = { + "parties": ["parties.id"], + "plannings": ["planning.budget"], + "tenders": ["tender.id"], + "tenderers": ["tenderers.id"], + "tenders_items": ["tender.items.id"], + "awards": ["awards.id"], + "awards_items": ["awards.items.id"], + "awards_suppliers": ["awards.suppliers.id"], + "contracts": ["contracts.id"], + "contracts_items": ["contracts.items.id"], + "contracts_transactions": ["contracts.implementation.transactions.id"], + "documents": [ + "planning.documents.id", + "tender.documents.id", + "awards.documents.id", + "contracts.documents.id", + "contracts.implementation.documents.id", + ], + "milestones": [ + "planning.milestones.id", + "tender.milestones.id", + "contracts.milestones.id", + "contracts.implementation.milestones.id", + ], + "amendments": ["tender.amendments.id", "awards.amendments.id", "contract.amendments.id"], + } + + with connections["data"].cursor() as cursor: + statement = """ + SELECT c.key AS check, SUM(jsonb_array_length(c.value)) AS count + FROM {table} flc, jsonb_each(flc.result->'checks') c + WHERE dataset_id = %(dataset_id)s + AND c.key IN %(checks)s + GROUP BY c.key + ORDER BY c.key + """ + + cursor.execute( + SQL(statement).format(table=Identifier(FieldLevelCheck._meta.db_table)), + {"checks": tuple(j for i in map.values() for j in i), "dataset_id": dataset_id}, + ) + + results = cursor.fetchall() + + counts = {} + for key, items in map.items(): + counts[key] = 0 + for i in items: + for r in results: + if r[0] == i: + counts[key] += int(r[1]) + + return JsonResponse({"status": "ok", "data": counts}, safe=False) + + +@csrf_exempt +def dataset_metadata(request, dataset_id): + meta = Dataset.objects.values_list("meta__collection_metadata", flat=True).get(id=dataset_id) + + return JsonResponse({"status": "ok", "data": meta}, safe=False) diff --git a/docs/changelog.rst b/docs/changelog.rst index aa30566b..5fa14aac 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -6,5 +6,5 @@ This changelog only notes major changes, to notify other developers. 2021-11-08 ---------- -- refactor: Split Django applications from Django project. :commit:`df4b678` +- refactor: Split Django applications from Django project. :commit:`df4b678` :commit:`fe94f41` - refactor: Move static assets out of code directory. :commit:`80bbd09`