Skip to content

Commit

Permalink
Merge 6277f49 into 72992af
Browse files Browse the repository at this point in the history
  • Loading branch information
santoshamohan committed Oct 9, 2018
2 parents 72992af + 6277f49 commit 4b18898
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 43 deletions.
44 changes: 24 additions & 20 deletions datacube/scripts/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,44 +295,48 @@ def submit_task(task):

pending = []

# Count of storage unit/s creation successful/failed
nc_successful = nc_failed = 0

# Count of storage unit/s indexed successfully or failed to index
index_successful = index_failed = 0

# Count of storage unit/s failed during file creation
f_failed = 0

tasks = iter(tasks)
pending += [submit_task(task) for task in itertools.islice(tasks, max(0, queue_size - len(pending)))]
total = pending
while pending:
completed, failed, pending = executor.get_ready(pending)

while True:
pending += [submit_task(task) for task in itertools.islice(tasks, queue_size)]
if len(pending) == 0:
break

nc_completed, failed, pending = executor.get_ready(pending)
nc_successful += len(nc_completed)

for future in failed:
try:
executor.result(future)
except Exception as err: # pylint: disable=broad-except
_LOG.exception('Failed to create storage unit file (Exception: %s) ', str(err))
f_failed += 1

_LOG.info('Storage unit file creation status (completed: %s, failed: %s, pending: %s)',
(len(total) - len(pending) - f_failed),
f_failed,
len(pending))
if not completed:
_LOG.exception('Failed to create storage unit file (Exception: %s) ', str(err), exc_info=True)
nc_failed += 1

_LOG.info('Storage unit file creation status (Created_Count: %s, Failed_Count: %s)',
nc_successful,
nc_failed)

if not nc_completed:
time.sleep(1)
continue

try:
# TODO: ideally we wouldn't block here indefinitely
# maybe limit gather to 50-100 results and put the rest into a index backlog
# this will also keep the queue full
results = executor.results(completed)
results = executor.results(nc_completed)
index_successful += _index_datasets(index, results)
except Exception as err: # pylint: disable=broad-except
_LOG.exception('Failed to index storage unit file (Exception: %s)', str(err))
_LOG.exception('Failed to index storage unit file (Exception: %s)', str(err), exc_info=True)
index_failed += 1

_LOG.info('Storage unit files indexed (successful: %s, failed: %s)', index_successful, index_failed)
_LOG.info('Storage unit files indexed (Successful: %s, Failed: %s)', index_successful, index_failed)

return index_successful, index_failed

Expand Down Expand Up @@ -381,8 +385,8 @@ def ingest_cmd(index,
save_tasks,
load_tasks,
dry_run,
executor,
allow_product_changes):
allow_product_changes,
executor):
# pylint: disable=too-many-locals

if config_file:
Expand Down
2 changes: 1 addition & 1 deletion datacube/ui/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def save_tasks(config, tasks, taskfile):
os.remove(taskfile)
return 0
else:
_LOG.info('Saved config and %d tasks to %s', i, taskfile)
_LOG.info('Saved config and %d tasks to %s', i - 1, taskfile)
return i - 1


Expand Down
71 changes: 49 additions & 22 deletions integration_tests/test_double_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from pathlib import Path

import pytest
import netCDF4

from integration_tests.data_utils import generate_test_scenes
from integration_tests.utils import prepare_test_ingestion_configuration
from integration_tests.test_full_ingestion import (check_open_with_api, check_data_with_api,
ensure_datasets_are_indexed, check_data_shape,
check_grid_mapping, check_cf_compliance, check_attributes,
check_dataset_metadata_in_storage_unit,
check_open_with_xarray)

PROJECT_ROOT = Path(__file__).parents[1]

Expand All @@ -13,7 +18,7 @@
@pytest.mark.parametrize('datacube_env_name', ('datacube',), indirect=True)
@pytest.mark.usefixtures('default_metadata_type',
'indexed_ls5_scene_products')
def test_double_ingestion(clirunner, index, tmpdir, ingest_configs):
def test_double_ingestion(clirunner, index, tmpdir, ingest_configs, example_ls5_dataset_paths):
"""
Test for the case where ingestor does not need to create a new product,
but should re-use an existing target product.
Expand All @@ -27,26 +32,48 @@ def test_double_ingestion(clirunner, index, tmpdir, ingest_configs):
def index_dataset(path):
return clirunner(['dataset', 'add', str(path)])

# Create and Index some example scene datasets
dataset_paths = generate_test_scenes(tmpdir)
for path in dataset_paths:
index_dataset(path)
def ingest_products():
valid_uuids = []
for uuid, ls5_dataset_path in example_ls5_dataset_paths.items():
valid_uuids.append(uuid)
index_dataset(ls5_dataset_path)

# Ensure that datasets are actually indexed
ensure_datasets_are_indexed(index, valid_uuids)

# Ingest them
clirunner([
'ingest',
'--config-file',
str(config_path)
])

# Validate that the ingestion is working as expected
datasets = index.datasets.search_eager(product='ls5_nbar_albers')
assert len(datasets) > 0
assert datasets[0].managed

# Ingest them
clirunner([
'ingest',
'--config-file',
str(config_path)
])
check_open_with_api(index, len(valid_uuids))
check_data_with_api(index, len(valid_uuids))

# NetCDF specific checks, based on the saved NetCDF file
ds_path = str(datasets[0].local_path)
with netCDF4.Dataset(ds_path) as nco:
check_data_shape(nco)
check_grid_mapping(nco)
check_cf_compliance(nco)
check_dataset_metadata_in_storage_unit(nco, example_ls5_dataset_paths)
check_attributes(nco, config['global_attributes'])

name = config['measurements'][0]['name']
check_attributes(nco[name], config['measurements'][0]['attrs'])
check_open_with_xarray(ds_path)

# Create and Index some example scene datasets
ingest_products()

######################
# Double Ingestion #
######################
# Create and Index some more scene datasets
dataset_paths = generate_test_scenes(tmpdir)
for path in dataset_paths:
index_dataset(path)

# Make sure that we can ingest the new scenes
clirunner([
'ingest',
'--config-file',
str(config_path)
])
ingest_products()
59 changes: 59 additions & 0 deletions integration_tests/test_full_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
COMPLIANCE_CHECKER_NORMAL_LIMIT = 2


@pytest.mark.timeout(20)
@pytest.mark.parametrize('datacube_env_name', ('datacube',), indirect=True)
@pytest.mark.usefixtures('default_metadata_type',
'indexed_ls5_scene_products')
Expand Down Expand Up @@ -121,6 +122,64 @@ def test_s3_full_ingestion(clirunner, index, tmpdir, example_ls5_dataset_paths,
check_data_with_api(index, len(valid_uuids))


@pytest.mark.timeout(20)
@pytest.mark.parametrize('datacube_env_name', ('datacube',), indirect=True)
@pytest.mark.usefixtures('default_metadata_type',
'indexed_ls5_scene_products')
def test_process_all_ingest_jobs(clirunner, index, tmpdir, example_ls5_dataset_paths, ingest_configs):
"""
Test for the case where ingestor processes upto `--queue-size` number of tasks and not all the available scenes
"""
# Make a test ingestor configuration
config = INGESTER_CONFIGS / ingest_configs['ls5_nbar_albers']
config_path, config = prepare_test_ingestion_configuration(tmpdir, None,
config, mode='fast_ingest')

def index_dataset(path):
return clirunner(['dataset', 'add', str(path)])

# Number of scenes generated is 3 (as per NUM_TIME_SLICES const from conftest.py)
# Set the queue size to process 2 tiles
queue_size = 2
valid_uuids = []
for uuid, ls5_dataset_path in example_ls5_dataset_paths.items():
valid_uuids.append(uuid)
index_dataset(ls5_dataset_path)

# Ensure that datasets are actually indexed
ensure_datasets_are_indexed(index, valid_uuids)

# Ingest all scenes (Though the queue size is 2, all 3 tiles will be ingested)
clirunner([
'ingest',
'--config-file',
str(config_path),
'--queue-size',
queue_size,
'--allow-product-changes',
])

# Validate that the ingestion is working as expected
datasets = index.datasets.search_eager(product='ls5_nbar_albers')
assert len(datasets) > 0
assert datasets[0].managed

check_open_with_api(index, len(valid_uuids))

# NetCDF specific checks, based on the saved NetCDF file
ds_path = str(datasets[0].local_path)
with netCDF4.Dataset(ds_path) as nco:
check_data_shape(nco)
check_grid_mapping(nco)
check_cf_compliance(nco)
check_dataset_metadata_in_storage_unit(nco, example_ls5_dataset_paths)
check_attributes(nco, config['global_attributes'])

name = config['measurements'][0]['name']
check_attributes(nco[name], config['measurements'][0]['attrs'])
check_open_with_xarray(ds_path)


def ensure_datasets_are_indexed(index, valid_uuids):
datasets = index.datasets.search_eager(product='ls5_nbar_scene')
assert len(datasets) == len(valid_uuids)
Expand Down

0 comments on commit 4b18898

Please sign in to comment.