Skip to content

Commit

Permalink
Merge e1a103d into 2fa297e
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewdhicks committed Mar 21, 2017
2 parents 2fa297e + e1a103d commit f04ec13
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 26 deletions.
42 changes: 42 additions & 0 deletions datacube/index/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,18 @@ def get_locations(self, id_):
with self._db.connect() as connection:
return connection.get_locations(id_)

def get_archived_locations(self, id_):
"""
:param typing.Union[UUID, str] id_: dataset id
:rtype: list[str]
"""
if isinstance(id_, Dataset):
warnings.warn("Passing dataset is deprecated after 1.2.2, pass dataset.id", DeprecationWarning)
id_ = id_.id

with self._db.connect() as connection:
return connection.get_archived_locations(id_)

def add_location(self, id_, uri):
"""
Add a location to the dataset if it doesn't already exist.
Expand Down Expand Up @@ -901,6 +913,36 @@ def remove_location(self, id_, uri):
was_removed = connection.remove_location(id_, uri)
return was_removed

def archive_location(self, id_, uri):
"""
Archive a location of the dataset if it exists.
:param typing.Union[UUID, str] id_: dataset id
:param str uri: fully qualified uri
:return bool: location was able to be archived
"""
if isinstance(id_, Dataset):
warnings.warn("Passing dataset is deprecated after 1.2.2, pass dataset.id", DeprecationWarning)
id_ = id_.id

with self._db.connect() as connection:
was_archived = connection.archive_location(id_, uri)
return was_archived

def restore_location(self, id_, uri):
"""
Un-archive a location of the dataset if it exists.
:param typing.Union[UUID, str] id_: dataset id
:param str uri: fully qualified uri
:return bool: location was able to be restored
"""
if isinstance(id_, Dataset):
warnings.warn("Passing dataset is deprecated after 1.2.2, pass dataset.id", DeprecationWarning)
id_ = id_.id

with self._db.connect() as connection:
was_restored = connection.restore_location(id_, uri)
return was_restored

def _make(self, dataset_res, full_info=False):
"""
:rtype datacube.model.Dataset
Expand Down
42 changes: 41 additions & 1 deletion datacube/index/postgres/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,21 @@ def get_locations(self, dataset_id):
select([
_dataset_uri_field(DATASET_LOCATION)
]).where(
DATASET_LOCATION.c.dataset_ref == dataset_id
and_(DATASET_LOCATION.c.dataset_ref == dataset_id, DATASET_LOCATION.c.archived == None)
).order_by(
DATASET_LOCATION.c.added.desc()
)
).fetchall()
]

def get_archived_locations(self, dataset_id):
return [
record[0]
for record in self._connection.execute(
select([
_dataset_uri_field(DATASET_LOCATION)
]).where(
and_(DATASET_LOCATION.c.dataset_ref == dataset_id, DATASET_LOCATION.c.archived != None)
).order_by(
DATASET_LOCATION.c.added.desc()
)
Expand All @@ -782,6 +796,32 @@ def remove_location(self, dataset_id, uri):
)
return res.rowcount > 0

def archive_location(self, dataset_id, uri):
scheme, body = _split_uri(uri)
res = self._connection.execute(
DATASET_LOCATION.update().where(
DATASET_LOCATION.c.dataset_ref == dataset_id
).where(
DATASET_LOCATION.c.archived == None
).values(
archived=func.now()
)
)
return res.rowcount > 0

def restore_location(self, dataset_id, uri):
scheme, body = _split_uri(uri)
res = self._connection.execute(
DATASET_LOCATION.update().where(
DATASET_LOCATION.c.dataset_ref == dataset_id
).where(
DATASET_LOCATION.c.archived != None
).values(
archived=None
)
)
return res.rowcount > 0

def __repr__(self):
return "PostgresDb<connection={!r}>".format(self._connection)

Expand Down
16 changes: 16 additions & 0 deletions datacube/index/postgres/tables/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ def _pg_exists(conn, name):
return conn.execute("SELECT to_regclass(%s)", name).scalar() is not None


def _pg_column_exists(conn, table, column):
"""
Does a postgres object exist?
:rtype bool
"""
return conn.execute("""
select TRUE from pg_attribute
where attrelid = to_regclass(%s)
and attname = %s
and not attisdropped
""", table, column) is not None


def database_exists(engine):
"""
Have they init'd this database?
Expand Down Expand Up @@ -154,6 +167,9 @@ def update_schema(engine):
if not engine.execute("SELECT 1 FROM pg_type WHERE typname = 'float8range'").scalar():
engine.execute(TYPES_INIT_SQL)

if not _pg_column_exists(engine, schema_qualified('dataset_location'), 'archived'):
engine.execute("alter table agdc.dataset_location add column archived TIMESTAMP WITH TIME ZONE")

# Update uri indexes to allow dataset search-by-uri.
if not _pg_exists(engine, schema_qualified('ix_agdc_dataset_location_dataset_ref')):
_LOG.info('Applying uri-search update')
Expand Down
3 changes: 3 additions & 0 deletions datacube/index/postgres/tables/_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
Column('added', DateTime(timezone=True), server_default=func.now(), nullable=False),
Column('added_by', _sql.PGNAME, server_default=func.current_user(), nullable=False),

# Date it was archived. Null for active locations.
Column('archived', DateTime(timezone=True), default=None, nullable=True),

UniqueConstraint('uri_scheme', 'uri_body', 'dataset_ref'),
)

Expand Down
36 changes: 33 additions & 3 deletions datacube/scripts/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from datacube.model.utils import make_dataset, xr_apply, datasets_to_doc
from datacube.storage.storage import write_dataset_to_netcdf
from datacube.ui import click as ui
from datacube.utils import read_documents
from datacube.utils import read_documents, changes
from datacube.ui.task_app import check_existing_files, load_tasks as load_tasks_, save_tasks as save_tasks_

from datacube.ui.click import cli
Expand All @@ -46,9 +46,11 @@ def morph_dataset_type(source_type, config):
output_type.definition['name'] = config['output_type']
output_type.definition['managed'] = True
output_type.definition['description'] = config['description']
output_type.definition['storage'] = config['storage']
output_type.metadata_doc['format'] = {'name': 'NetCDF'}

output_type.definition['storage'] = {k: v for (k, v) in config['storage'].items()
if k in ('crs', 'tile_size', 'resolution', 'origin')}

def merge_measurement(measurement, spec):
measurement.update({k: spec.get(k, measurement[k]) for k in ('name', 'nodata', 'dtype')})
return measurement
Expand Down Expand Up @@ -122,7 +124,35 @@ def make_output_type(index, config):

output_type = morph_dataset_type(source_type, config)
_LOG.info('Created DatasetType %s', output_type.name)
output_type = index.products.add(output_type)

# Some storage fields should not be in the product definition, and should be removed.
# To handle backwards compatibility for now, ignore them with custom rules,
# rather than using the default checks done by index.products.add
existing = index.products.get_by_name(output_type.name)
backwards_compatible_fields = True
if existing and backwards_compatible_fields:
updates_allowed = {
('description',): changes.allow_any,
('metadata_type',): changes.allow_any,
('storage', 'chunking'): changes.allow_any,
('storage', 'driver'): changes.allow_any,
('storage', 'dimension_order'): changes.allow_any,
('metadata',): changes.allow_truncation
}

doc_changes = changes.get_doc_changes(output_type.definition,
datacube.utils.jsonify_document(existing.definition))
good_changes, bad_changes = changes.classify_changes(doc_changes, updates_allowed)
if bad_changes:
raise ValueError(
'{} differs from stored ({})'.format(
output_type.name,
', '.join(['{}: {!r}!={!r}'.format('.'.join(offset), v1, v2) for offset, v1, v2 in bad_changes])
)
)
output_type = index.products.update(output_type, allow_unsafe_updates=True)
else:
output_type = index.products.add(output_type)

return source_type, output_type

Expand Down
5 changes: 4 additions & 1 deletion datacube/ui/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime
import logging
import os
import time
import click
import cachetools
import functools
Expand Down Expand Up @@ -30,6 +31,7 @@ def load_config(index, app_config_file, make_config, make_tasks, *args, **kwargs
app_config_path = Path(app_config_file)
_, config = next(read_documents(app_config_path))
config['app_config_file'] = app_config_path.name
config['task_timestamp'] = int(time.time())

config = make_config(index, config, **kwargs)

Expand Down Expand Up @@ -194,7 +196,7 @@ def do_nothing(result):
pass


def run_tasks(tasks, executor, run_task, process_result=do_nothing, queue_size=50):
def run_tasks(tasks, executor, run_task, process_result=None, queue_size=50):
"""
:param tasks: iterable of tasks. Usually a generator to create them as required.
:param executor: a datacube executor, similar to `distributed.Client` or `concurrent.futures`
Expand All @@ -205,6 +207,7 @@ def run_tasks(tasks, executor, run_task, process_result=do_nothing, queue_size=5
processed, and how much memory is available to buffer them.
"""
click.echo('Starting processing...')
process_result = process_result or do_nothing
results = []
task_queue = itertools.islice(tasks, queue_size)
for task in task_queue:
Expand Down
110 changes: 110 additions & 0 deletions datacube_apps/ncml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Create statistical summaries command
"""
from __future__ import absolute_import, print_function

import logging
import os
from datetime import datetime
from functools import partial

import click
from dateutil import tz
from pathlib import Path

import datacube
from datacube.ui import task_app
from datacube.ui.click import to_pathlib


_LOG = logging.getLogger(__name__)


APP_NAME = 'datacube-ncml'


def get_filename(config, cell_index):
file_path_template = str(Path(config['location'], config['ncml_path_template']))
return file_path_template.format(tile_index=cell_index)


def make_ncml_tasks(index, config, cell_index=None, **kwargs):
product = config['product']

gw = datacube.api.GridWorkflow(index=index, product=product.name)
cells = gw.list_cells(product=product.name, cell_index=cell_index)
for (cell_index, tile) in cells.items():
output_filename = get_filename(config, cell_index)
yield dict(tile=tile,
cell_index=cell_index,
output_filename=output_filename)


def make_ncml_config(index, config, export_path=None, **query):
config['product'] = index.products.get_by_name(config['output_type'])

if not os.access(config['location'], os.W_OK):
_LOG.warning('Current user appears not have write access output location: %s', config['location'])
return config


def get_history_attribute(config, task):
return '{dt} {user} {app} ({ver}) {args} # {comment}'.format(
dt=datetime.now(tz.tzlocal()).isoformat(),
user=os.environ.get('USERNAME') or os.environ.get('USER'),
app=APP_NAME,
ver=datacube.__version__,
args=', '.join([config['app_config_file'],
str(config['version']),
task['output_filename'],
str(task['cell_index'])
]),
comment='Created NCML file to aggregate multiple NetCDF files along the time dimension'
)


def do_ncml_task(config, task):
tile = task['tile']
ncml_filename = task['output_filename']

ncml_header = """<netcdf xmlns="http://www.unidata.ucar.edu/namespaces/netcdf/ncml-2.2">
<attribute name='date_created' type='string' value='{date_created}' />
<attribute name='history' type='string' value='{history}' />
<aggregation dimName="time" type="joinExisting">"""

ncml_footer = """
</aggregation>
</netcdf>"""

netcdf_def = """
<netcdf xmlns="http://www.unidata.ucar.edu/namespaces/netcdf/ncml-2.2" location="{path}">
<remove name="dataset" type="variable" />
<remove name="dataset_nchar" type="dimension" />
</netcdf>"""

with open(ncml_filename, 'w') as ncml_file:
ncml_file.write(ncml_header.format(date_created=datetime.today().isoformat(),
history=get_history_attribute(config, task)))
for timeslice_sources in tile.sources.values:
ncml_file.write(netcdf_def.format(path=str(timeslice_sources[0].local_path)))
ncml_file.write(ncml_footer)


@click.command(name=APP_NAME)
@datacube.ui.click.pass_index(app_name=APP_NAME)
@datacube.ui.click.global_cli_options
@click.option('--cell-index', 'cell_index', help='Limit the process to a particular cell (e.g. 14,-11)',
callback=task_app.validate_cell_index, default=None)
@task_app.queue_size_option
@task_app.task_app_options
@task_app.task_app(make_config=make_ncml_config, make_tasks=make_ncml_tasks)
def main(index, config, tasks, executor, queue_size, **kwargs):
click.echo('Starting datacube ncml utility...')

task_func = partial(do_ncml_task, config)
task_app.run_tasks(tasks, executor, task_func, None, queue_size)


if __name__ == '__main__':
main()
Loading

0 comments on commit f04ec13

Please sign in to comment.