Skip to content

Commit

Permalink
Merge 228a4d9 into 419486b
Browse files Browse the repository at this point in the history
  • Loading branch information
santoshamohan committed Jul 23, 2018
2 parents 419486b + 228a4d9 commit bd8a8fe
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions datacube/scripts/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,23 +285,25 @@ def submit_task(task):

pending = []
n_successful = n_failed = 0
f_failed = 0

tasks = iter(tasks)
while True:
pending += [submit_task(task) for task in itertools.islice(tasks, max(0, queue_size - len(pending)))]
if not pending:
break

pending += [submit_task(task) for task in itertools.islice(tasks, max(0, queue_size - len(pending)))]
total = pending
while pending:
completed, failed, pending = executor.get_ready(pending)
_LOG.info('completed %s, failed %s, pending %s', len(completed), len(failed), len(pending))

for future in failed:
try:
executor.result(future)
except Exception: # pylint: disable=broad-except
_LOG.exception('Task failed')
n_failed += 1
f_failed += 1

_LOG.info('Storage unit creation (completed: %s, failed: %s, pending: %s)',
(len(total) - len(pending) - f_failed),
f_failed,
len(pending))
if not completed:
time.sleep(1)
continue
Expand All @@ -313,8 +315,10 @@ def submit_task(task):
results = executor.results(completed)
n_successful += _index_datasets(index, results)
except Exception as e: # pylint: disable=broad-except
_LOG.exception('Gather failed')
pending += completed
_LOG.exception('Gather failed during indexing')
n_failed += 1

_LOG.info('Index storage unit (successful: %s, failed: %s)', n_successful, n_failed)

return n_successful, n_failed

Expand Down

0 comments on commit bd8a8fe

Please sign in to comment.