Skip to content

Commit

Permalink
Ensure harvest tasks are updated on the right job
Browse files Browse the repository at this point in the history
Fix NEXT-DATAGOUVFR-18QF
Fix NEXT-DATAGOUVFR-8W
Fix WWW-DATAGOUVFR-19PN
Fix WWW-DATAGOUVFR-NS6
  • Loading branch information
noirbizarre committed Mar 27, 2019
1 parent b044a10 commit 8791e1a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,7 @@
- Prevent client-side error while handling unparseable API response [#2076](https://github.com/opendatateam/udata/pull/2076)
- Fix the `udata job schedule` erroneous help message [#2083](https://github.com/opendatateam/udata/pull/2083)
- Fix upload button on replace resource file [#2085](https://github.com/opendatateam/udata/pull/2085)
- Ensure harvest items status are updated on the right job [#2089](https://github.com/opendatateam/udata/pull/2089)

## 1.6.5 (2019-02-27)

Expand Down
10 changes: 7 additions & 3 deletions udata/harvest/backends/base.py
Expand Up @@ -84,9 +84,13 @@ class BaseBackend(object):
# This a Sequence[HarvestFeature]
features = tuple()

def __init__(self, source, job=None, dryrun=False, max_items=None):
self.source = source
self.job = job
def __init__(self, source_or_job, dryrun=False, max_items=None):
if isinstance(source_or_job, HarvestJob):
self.source = source_or_job.source
self.job = source_or_job
else:
self.source = source_or_job
self.job = None
self.dryrun = dryrun
self.max_items = max_items

Expand Down
44 changes: 32 additions & 12 deletions udata/harvest/tasks.py
@@ -1,13 +1,15 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import warnings

from celery import chord
from flask import current_app

from udata.tasks import job, get_logger, task

from . import backends
from .models import HarvestSource
from .models import HarvestSource, HarvestJob

log = get_logger(__name__)

Expand All @@ -21,38 +23,56 @@ def harvest(self, ident):
backend = Backend(source)
items = backend.perform_initialization()
if items > 0:
finalize = harvest_finalize.s(ident)
finalize = harvest_job_finalize.s(backend.job.id)
items = [
harvest_item.s(ident, item.remote_id) for item in backend.job.items
harvest_job_item.s(backend.job.id, item.remote_id)
for item in backend.job.items
]
chord(items)(finalize)
elif items == 0:
backend.finalize()


@task(ignore_result=False, route='low.harvest')
def harvest_item(source_id, item_id):
log.info('Harvesting item %s for source "%s"', item_id, source_id)
def harvest_job_item(job_id, item_id):
log.info('Harvesting item %s for job "%s"', item_id, job_id)

source = HarvestSource.get(source_id)
job = source.get_last_job()
Backend = backends.get(current_app, source.backend)
backend = Backend(source, job)
job = HarvestJob.objects.get(pk=job_id)
Backend = backends.get(current_app, job.source.backend)
backend = Backend(job)

item = filter(lambda i: i.remote_id == item_id, job.items)[0]

result = backend.process_item(item)
return (item_id, result)


@task(ignore_result=False, route='low.harvest')
def harvest_item(source_id, item_id):
log.info('Harvesting item %s for source "%s"', item_id, source_id)
msg = 'harvest_item is deprecated and only here for backward comaptibility'
warnings.warn(msg, DeprecationWarning)
job = HarvestSource.get(source_id).get_last_job()
return harvest_job_item(job.id, item_id)


@task(ignore_result=False, route='low.harvest')
def harvest_job_finalize(results, job_id):
log.info('Finalize harvesting for job "%s"', job_id)
job = HarvestJob.objects.get(pk=job_id)
Backend = backends.get(current_app, job.source.backend)
backend = Backend(job)
backend.finalize()


@task(ignore_result=False, route='low.harvest')
def harvest_finalize(results, source_id):
log.info('Finalize harvesting for source "%s"', source_id)
msg = 'harvest_item is deprecated and only here for backward comaptibility'
warnings.warn(msg, DeprecationWarning)
source = HarvestSource.get(source_id)
job = source.get_last_job()
Backend = backends.get(current_app, source.backend)
backend = Backend(source, job)
backend.finalize()
harvest_job_finalize(results, job.id)


@task(route='low.harvest')
Expand Down

0 comments on commit 8791e1a

Please sign in to comment.