Skip to content

Commit

Permalink
feat: update kingfisher process URLs and use metadata endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
yolile committed Apr 22, 2024
1 parent d9ea195 commit aa9584f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 44 deletions.
36 changes: 1 addition & 35 deletions data_registry/process_manager/process.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from datetime import date, datetime, timedelta
from datetime import date, timedelta
from urllib.parse import urljoin

from django.conf import settings
Expand Down Expand Up @@ -129,8 +129,6 @@ def process(collection):
# completed job postprocessing
try:
update_collection_availability(job)

update_collection_metadata(job)
except Exception as e:
logger.exception(e)
else:
Expand Down Expand Up @@ -214,35 +212,3 @@ def update_collection_availability(job):
job.milestones_count = counts.get("milestones")
job.amendments_count = counts.get("amendments")
job.save()


def update_collection_metadata(job):
try:
pelican_id = job.context.get("pelican_id")
response = request("GET", urljoin(settings.PELICAN_FRONTEND_URL, f"/api/datasets/{pelican_id}/metadata/"))
except Exception as e:
raise Exception(
f"Publication {job.collection}: Pelican: Unable to get metadata of dataset {pelican_id}"
) from e

meta = response.json()

if meta:
job.date_from = parse_date(meta.get("published_from"))
job.date_to = parse_date(meta.get("published_to"))
job.license = meta.get("data_license") or ""
job.ocid_prefix = meta.get("ocid_prefix") or ""
job.save()


def parse_date(datetime_str):
if not datetime_str:
return None

try:
try:
return datetime.strptime(datetime_str, "%Y-%m-%d %H.%M.%S").date()
except ValueError: # e.g. nigeria_plateau_state
return datetime.strptime(datetime_str, "%y-%m-%d %H.%M.%S").date()
except ValueError as e:
logger.exception(e)
24 changes: 19 additions & 5 deletions data_registry/process_manager/task/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

logger = logging.getLogger(__name__)

base_url = "/api/collections"


class Process:
job = None
Expand All @@ -24,7 +26,7 @@ def run(self):
def get_status(self):
response = request(
"GET",
urljoin(settings.KINGFISHER_PROCESS_URL, f"/api/v1/tree/{self.process_id}/"),
urljoin(settings.KINGFISHER_PROCESS_URL, f"{base_url}/{self.process_id}/tree/"),
error_msg=f"Unable to get status of process #{self.process_id}",
)

Expand All @@ -38,6 +40,20 @@ def get_status(self):
self.job.context["process_data_version"] = compile_releases.get("data_version")
self.job.save()

if is_last_completed:
response = request(
"GET",
urljoin(settings.KINGFISHER_PROCESS_URL, f"{base_url}/{self.process_id}/metadata/"),
error_msg=f"Unable to get the metadata of process #{self.process_id}",
)
meta = response.json()
if meta:
self.job.date_from = meta.get("published_from")
self.job.date_to = meta.get("published_to")
self.job.license = meta.get("data_license") or ""
self.job.ocid_prefix = meta.get("ocid_prefix") or ""
self.job.save()

return Task.Status.COMPLETED if is_last_completed else Task.Status.RUNNING

def wipe(self):
Expand All @@ -47,9 +63,7 @@ def wipe(self):

logger.info("Wiping Kingfisher Process data for collection id %s.", self.process_id)
request(
"POST",
urljoin(settings.KINGFISHER_PROCESS_URL, "/api/v1/wipe_collection"),
json={"collection_id": self.process_id},
"DELETE",
urljoin(settings.KINGFISHER_PROCESS_URL, f"{base_url}/{self.process_id}/"),
error_msg="Unable to wipe PROCESS",
consume_exception=True,
)
5 changes: 1 addition & 4 deletions tests/data_registry/process_manager/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@ def test(self):

with patch("data_registry.process_manager.process.get_runner") as mock_get_runner, patch(
"data_registry.process_manager.process.update_collection_availability"
) as mock_update_collection_availability, patch(
"data_registry.process_manager.process.update_collection_metadata"
) as mock_update_collection_metadata:
) as mock_update_collection_availability:
# get_runner returns only TestTask
mock_get_runner.return_value = TestTask()
# skip update_collection_availability (does nothing, counts are not set!)
mock_update_collection_availability.return_value = None
# skip update_collection_metadaat (does nothing, metadata are not set!)
mock_update_collection_metadata.return_value = None

settings.JOB_TASKS_PLAN = ["test"]

Expand Down

0 comments on commit aa9584f

Please sign in to comment.