Skip to content

Commit

Permalink
refactor: Split RabbitMQ-related and data registry-related views into…
Browse files Browse the repository at this point in the history
… new processor app
  • Loading branch information
jpmckinney committed Nov 9, 2021
1 parent fe94f41 commit f01bcaf
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 168 deletions.
1 change: 1 addition & 0 deletions backend/core/settings.py
Expand Up @@ -52,6 +52,7 @@
"tastypie",
"corsheaders",
"dqt",
"processor.apps.ProcessorConfig",
"exporter.apps.ExporterConfig",
]

Expand Down
1 change: 1 addition & 0 deletions backend/core/urls.py
Expand Up @@ -2,5 +2,6 @@

urlpatterns = [
path("", include("dqt.urls"), name="api"),
path("", include("processor.urls"), name="processor"),
path("", include("exporter.urls"), name="exporter"),
]
14 changes: 0 additions & 14 deletions backend/dqt/urls.py
Expand Up @@ -12,17 +12,10 @@
TimeVarianceLevelCheckResource,
)
from .views import (
create_dataset_filter,
dataset_availability,
dataset_distinct_values,
dataset_filter_items,
dataset_id,
dataset_level_stats,
dataset_metadata,
dataset_progress,
dataset_start,
dataset_stats,
dataset_wipe,
field_level_detail,
field_level_stats,
resource_level_detail,
Expand All @@ -47,7 +40,6 @@
path("api/time_variance_level_stats/<dataset_id>", time_variance_level_stats, name="time_variance_level_stats"),
path("api/field_level_detail/<dataset_id>/<path>", field_level_detail, name="field_level_detail"),
path("api/resource_level_detail/<dataset_id>/<check_name>", resource_level_detail, name="resource_level_detail"),
path("api/create_dataset_filter", create_dataset_filter, name="create_dataset_filter"),
path("api/dataset_filter_items", dataset_filter_items, name="dataset_filter_items"),
path(
"api/dataset_distinct_values/<dataset_id>/<json_path>", dataset_distinct_values, name="dataset_distinct_values"
Expand All @@ -57,12 +49,6 @@
dataset_distinct_values,
name="dataset_distinct_values",
),
path("api/dataset_start", dataset_start, name="dataset_start"),
path("api/dataset_status/<dataset_id>", dataset_progress, name="dataset_status"),
path("api/dataset_id", dataset_id, name="dataset_id"),
path("api/dataset_availability/<dataset_id>", dataset_availability, name="dataset_availability"),
path("api/dataset_metadata/<dataset_id>", dataset_metadata, name="dataset_metadata"),
path("api/dataset_wipe", dataset_wipe, name="dataset_wipe"),
re_path(r"^api/", include(dataset_resource.urls)),
re_path(r"^api/", include(data_item_resource.urls)),
re_path(r"^api/", include(progress_monitor_dataset_resource.urls)),
Expand Down
156 changes: 3 additions & 153 deletions backend/dqt/views.py
Expand Up @@ -3,29 +3,11 @@
import simplejson as json
from django.db import connections
from django.db.models import Count
from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse
from django.http import HttpResponseBadRequest, JsonResponse
from django.views.decorators.csrf import csrf_exempt
from psycopg2.sql import SQL, Identifier
from psycopg2.sql import SQL

from .models import (
DataItem,
Dataset,
DatasetLevelCheck,
FieldLevelCheck,
ProgressMonitorDataset,
TimeVarianceLevelCheck,
)
from .tools.rabbit import publish


@csrf_exempt
def create_dataset_filter(request):
if request.method == "GET":
return HttpResponseBadRequest(reason="Only post method is accepted.")

publish(request.body, "_dataset_filter_extractor_init")

return HttpResponse("done")
from .models import DataItem, Dataset, DatasetLevelCheck, TimeVarianceLevelCheck


@csrf_exempt
Expand Down Expand Up @@ -263,135 +245,3 @@ def time_variance_level_stats(request, dataset_id):
"meta": check.meta,
}
return JsonResponse(result)


@csrf_exempt
def dataset_start(request):
if request.method == "GET":
return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}})

routing_key = "_ocds_kingfisher_extractor_init"

body = json.loads(request.body.decode("utf-8"))

dataset_name = body.get("name")

message = {
"name": dataset_name,
"collection_id": body.get("collection_id"),
# "ancestor_id": ancestor_id,
# "max_items": max_items,
}

publish(json.dumps(message), routing_key)

return JsonResponse(
{"status": "ok", "data": {"message": f"Dataset {dataset_name} on Pelican started"}}, safe=False
)


@csrf_exempt
def dataset_wipe(request):
if request.method == "GET":
return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}})

routing_key = "_wiper_init"

body = json.loads(request.body.decode("utf-8"))

message = {
"dataset_id": body.get("dataset_id"),
}

publish(json.dumps(message), routing_key)

return JsonResponse(
{"status": "ok", "data": {"message": f"Dataset id {body.get('dataset_id')} on Pelican will be wiped"}},
safe=False,
)


@csrf_exempt
def dataset_progress(request, dataset_id):
try:
monitor = ProgressMonitorDataset.objects.values("state", "phase").get(dataset__id=dataset_id)
return JsonResponse({"status": "ok", "data": monitor}, safe=False)
except ProgressMonitorDataset.DoesNotExist:
return JsonResponse({"status": "ok", "data": None}, safe=False)


@csrf_exempt
def dataset_id(request):
if request.method == "GET":
return JsonResponse({"status": "error", "data": {"reason": "Only post method is accepted."}})

body = json.loads(request.body.decode("utf-8"))
dataset_name = body.get("name")

dataset = Dataset.objects.get(name=dataset_name)
return JsonResponse({"status": "ok", "data": dataset.id if dataset else None}, safe=False)


@csrf_exempt
def dataset_availability(request, dataset_id):
map = {
"parties": ["parties.id"],
"plannings": ["planning.budget"],
"tenders": ["tender.id"],
"tenderers": ["tenderers.id"],
"tenders_items": ["tender.items.id"],
"awards": ["awards.id"],
"awards_items": ["awards.items.id"],
"awards_suppliers": ["awards.suppliers.id"],
"contracts": ["contracts.id"],
"contracts_items": ["contracts.items.id"],
"contracts_transactions": ["contracts.implementation.transactions.id"],
"documents": [
"planning.documents.id",
"tender.documents.id",
"awards.documents.id",
"contracts.documents.id",
"contracts.implementation.documents.id",
],
"milestones": [
"planning.milestones.id",
"tender.milestones.id",
"contracts.milestones.id",
"contracts.implementation.milestones.id",
],
"amendments": ["tender.amendments.id", "awards.amendments.id", "contract.amendments.id"],
}

with connections["data"].cursor() as cursor:
statement = """
SELECT c.key AS check, SUM(jsonb_array_length(c.value)) AS count
FROM {table} flc, jsonb_each(flc.result->'checks') c
WHERE dataset_id = %(dataset_id)s
AND c.key IN %(checks)s
GROUP BY c.key
ORDER BY c.key
"""

cursor.execute(
SQL(statement).format(table=Identifier(FieldLevelCheck._meta.db_table)),
{"checks": tuple(j for i in map.values() for j in i), "dataset_id": dataset_id},
)

results = cursor.fetchall()

counts = {}
for key, items in map.items():
counts[key] = 0
for i in items:
for r in results:
if r[0] == i:
counts[key] += int(r[1])

return JsonResponse({"status": "ok", "data": counts}, safe=False)


@csrf_exempt
def dataset_metadata(request, dataset_id):
meta = Dataset.objects.values_list("meta__collection_metadata", flat=True).get(id=dataset_id)

return JsonResponse({"status": "ok", "data": meta}, safe=False)
File renamed without changes.
6 changes: 6 additions & 0 deletions backend/processor/apps.py
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class ProcessorConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "processor"
File renamed without changes.
21 changes: 21 additions & 0 deletions backend/processor/urls.py
@@ -0,0 +1,21 @@
from django.urls import path

from .views import (
create_dataset_filter,
dataset_availability,
dataset_id,
dataset_metadata,
dataset_progress,
dataset_start,
dataset_wipe,
)

urlpatterns = [
path("api/create_dataset_filter", create_dataset_filter, name="create_dataset_filter"),
path("api/dataset_status/<dataset_id>", dataset_progress, name="dataset_status"),
path("api/dataset_id", dataset_id, name="dataset_id"),
path("api/dataset_availability/<dataset_id>", dataset_availability, name="dataset_availability"),
path("api/dataset_metadata/<dataset_id>", dataset_metadata, name="dataset_metadata"),
path("api/dataset_start", dataset_start, name="dataset_start"),
path("api/dataset_wipe", dataset_wipe, name="dataset_wipe"),
]

0 comments on commit f01bcaf

Please sign in to comment.