From 8791e1a996fcfc4aa8434e10e044815efa82d38d Mon Sep 17 00:00:00 2001 From: Axel Haustant Date: Wed, 27 Mar 2019 16:28:51 +0100 Subject: [PATCH] Ensure harvest tasks are updated on the right job Fix NEXT-DATAGOUVFR-18QF Fix NEXT-DATAGOUVFR-8W Fix WWW-DATAGOUVFR-19PN Fix WWW-DATAGOUVFR-NS6 --- CHANGELOG.md | 1 + udata/harvest/backends/base.py | 10 +++++--- udata/harvest/tasks.py | 44 ++++++++++++++++++++++++---------- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1478e56f0f..48a5dd7b84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Prevent client-side error while handling unparseable API response [#2076](https://github.com/opendatateam/udata/pull/2076) - Fix the `udata job schedule` erroneous help message [#2083](https://github.com/opendatateam/udata/pull/2083) - Fix upload button on replace resource file [#2085](https://github.com/opendatateam/udata/pull/2085) +- Ensure harvest items status are updated on the right job [#2089](https://github.com/opendatateam/udata/pull/2089) ## 1.6.5 (2019-02-27) diff --git a/udata/harvest/backends/base.py b/udata/harvest/backends/base.py index 1068fc36de..8d07947f5a 100644 --- a/udata/harvest/backends/base.py +++ b/udata/harvest/backends/base.py @@ -84,9 +84,13 @@ class BaseBackend(object): # This a Sequence[HarvestFeature] features = tuple() - def __init__(self, source, job=None, dryrun=False, max_items=None): - self.source = source - self.job = job + def __init__(self, source_or_job, dryrun=False, max_items=None): + if isinstance(source_or_job, HarvestJob): + self.source = source_or_job.source + self.job = source_or_job + else: + self.source = source_or_job + self.job = None self.dryrun = dryrun self.max_items = max_items diff --git a/udata/harvest/tasks.py b/udata/harvest/tasks.py index 6e73f64d52..b903c063db 100644 --- a/udata/harvest/tasks.py +++ b/udata/harvest/tasks.py @@ -1,13 +1,15 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals +import warnings + from celery import chord from flask import current_app from udata.tasks import job, get_logger, task from . import backends -from .models import HarvestSource +from .models import HarvestSource, HarvestJob log = get_logger(__name__) @@ -21,9 +23,10 @@ def harvest(self, ident): backend = Backend(source) items = backend.perform_initialization() if items > 0: - finalize = harvest_finalize.s(ident) + finalize = harvest_job_finalize.s(backend.job.id) items = [ - harvest_item.s(ident, item.remote_id) for item in backend.job.items + harvest_job_item.s(backend.job.id, item.remote_id) + for item in backend.job.items ] chord(items)(finalize) elif items == 0: @@ -31,13 +34,12 @@ def harvest(self, ident): @task(ignore_result=False, route='low.harvest') -def harvest_item(source_id, item_id): - log.info('Harvesting item %s for source "%s"', item_id, source_id) +def harvest_job_item(job_id, item_id): + log.info('Harvesting item %s for job "%s"', item_id, job_id) - source = HarvestSource.get(source_id) - job = source.get_last_job() - Backend = backends.get(current_app, source.backend) - backend = Backend(source, job) + job = HarvestJob.objects.get(pk=job_id) + Backend = backends.get(current_app, job.source.backend) + backend = Backend(job) item = filter(lambda i: i.remote_id == item_id, job.items)[0] @@ -45,14 +47,32 @@ def harvest_item(source_id, item_id): return (item_id, result) +@task(ignore_result=False, route='low.harvest') +def harvest_item(source_id, item_id): + log.info('Harvesting item %s for source "%s"', item_id, source_id) + msg = 'harvest_item is deprecated and only here for backward comaptibility' + warnings.warn(msg, DeprecationWarning) + job = HarvestSource.get(source_id).get_last_job() + return harvest_job_item(job.id, item_id) + + +@task(ignore_result=False, route='low.harvest') +def harvest_job_finalize(results, job_id): + log.info('Finalize harvesting for job "%s"', job_id) + job = HarvestJob.objects.get(pk=job_id) + Backend = backends.get(current_app, job.source.backend) + backend = Backend(job) + backend.finalize() + + @task(ignore_result=False, route='low.harvest') def harvest_finalize(results, source_id): log.info('Finalize harvesting for source "%s"', source_id) + msg = 'harvest_item is deprecated and only here for backward comaptibility' + warnings.warn(msg, DeprecationWarning) source = HarvestSource.get(source_id) job = source.get_last_job() - Backend = backends.get(current_app, source.backend) - backend = Backend(source, job) - backend.finalize() + harvest_job_finalize(results, job.id) @task(route='low.harvest')