Skip to content

Commit

Permalink
Merge c97e8b4 into c5200e2
Browse files Browse the repository at this point in the history
  • Loading branch information
santoshamohan committed Oct 23, 2018
2 parents c5200e2 + c97e8b4 commit 0d3de4a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 2 deletions.
3 changes: 1 addition & 2 deletions datacube/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from ..index import index_connect
from ..drivers import new_datasource

_LOG = logging.getLogger(__name__)
THREADING_REQS_AVAILABLE = ('SharedArray' in sys.modules and 'pathos.threading' in sys.modules)

Group = namedtuple('Group', ['key', 'datasets'])
Expand Down Expand Up @@ -657,7 +656,7 @@ def fuse_lazy(datasets, geobox, measurement, skip_broken_datasets=False, fuse_fu
def _fuse_measurement(dest, datasets, geobox, measurement,
skip_broken_datasets=False,
fuse_func=None):
reproject_and_fuse([new_datasource(dataset, measurement.name) for dataset in datasets],
reproject_and_fuse([new_datasource(dataset, measurement.name) for dataset in datasets if dataset.uris],
dest,
geobox.affine,
geobox.crs,
Expand Down
6 changes: 6 additions & 0 deletions datacube/scripts/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ def ingest_work(config, source_type, output_type, tile, tile_index):

with datacube.set_options(reproject_threads=1):
fuse_func = {'copy': None}[config.get(FUSER_KEY, 'copy')]

datasets = tile.sources.sum().item()
for dataset in datasets:
if not dataset.uris:
_LOG.error('Locationless dataset found in the database: %r', dataset)

data = Datacube.load_data(tile.sources, tile.geobox, measurements,
resampling=resampling,
fuse_func=fuse_func)
Expand Down
1 change: 1 addition & 0 deletions docs/about/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ v1.7dev
- Allow specifying different resampling methods for different data variables of
the same Product. (:pull:`551`)
- Bugfixes and improved performance of `dask`-backed arrays (:pull:`547`)
- Bug fix (Index out of bounds causing ingestion failures)


v1.6.1 (27 August 2018)
Expand Down
84 changes: 84 additions & 0 deletions integration_tests/test_index_out_of_bound.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# coding=utf-8
from __future__ import absolute_import

import pytest

import datacube
from integration_tests.test_end_to_end import INGESTER_CONFIGS
from integration_tests.test_full_ingestion import (check_open_with_api,
ensure_datasets_are_indexed, check_data_shape,
check_grid_mapping, check_cf_compliance, check_attributes,
check_dataset_metadata_in_storage_unit,
check_open_with_xarray)
from integration_tests.utils import prepare_test_ingestion_configuration
import netCDF4


@pytest.mark.timeout(20)
@pytest.mark.parametrize('datacube_env_name', ('datacube',), indirect=True)
@pytest.mark.usefixtures('default_metadata_type',
'indexed_ls5_scene_products')
def test_index_out_of_bound_error(clirunner, index, tmpdir, example_ls5_dataset_paths, ingest_configs):
"""
Test for the case where ingestor processes upto `--queue-size` number of tasks and not all the available scenes
"""
# Make a test ingestor configuration
config = INGESTER_CONFIGS / ingest_configs['ls5_nbar_albers']
config_path, config = prepare_test_ingestion_configuration(tmpdir, None,
config, mode='fast_ingest')

def index_dataset(path):
return clirunner(['dataset', 'add', str(path)])

# Set the queue size to process 5 tiles
queue_size = 5
valid_uuids = []
for uuid, ls5_dataset_path in example_ls5_dataset_paths.items():
valid_uuids.append(uuid)
index_dataset(ls5_dataset_path)

# Ensure that datasets are actually indexed
ensure_datasets_are_indexed(index, valid_uuids)

# Locationless scenario within database arises when we run the sync tool (with --update-location option)
# on the disk where the actual file is removed and regenerated again with new dataset id.
for indexed_uuid in valid_uuids:
dc1 = datacube.Datacube(index=index)
datasets = dc1.find_datasets(product='ls5_nbar_scene')
try:
# Remove location from the index, to simulate indexed out of range scenario
res = dc1.index.datasets.remove_location(indexed_uuid, datasets[0].local_uri)
except AttributeError:
# Do for one dataset, ignore any other attribute errors
pass
assert res is True, "Error for %r. output: %r" % (indexed_uuid, res)

# Ingest scenes with locationless dataset
clirunner([
'ingest',
'--config-file',
str(config_path),
'--queue-size',
queue_size,
'--allow-product-changes',
])

# Validate that the ingestion is working as expected
datasets = index.datasets.search_eager(product='ls5_nbar_albers')
assert len(datasets) > 0
assert datasets[0].managed

check_open_with_api(index, len(valid_uuids))

# NetCDF specific checks, based on the saved NetCDF file
ds_path = str(datasets[0].local_path)
with netCDF4.Dataset(ds_path) as nco:
check_data_shape(nco)
check_grid_mapping(nco)
check_cf_compliance(nco)
check_dataset_metadata_in_storage_unit(nco, example_ls5_dataset_paths)
check_attributes(nco, config['global_attributes'])

name = config['measurements'][0]['name']
check_attributes(nco[name], config['measurements'][0]['attrs'])
check_open_with_xarray(ds_path)

0 comments on commit 0d3de4a

Please sign in to comment.