Skip to content

Commit

Permalink
Update code in response to code review for PR #234
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewdhicks committed May 16, 2017
1 parent 817dab2 commit 4363a14
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 45 deletions.
6 changes: 6 additions & 0 deletions datacube/ui/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ def validate_year(ctx, param, value):
'or as an inclusive range (eg 1996-2001)')


def break_query_into_years(time_query, **kwargs):
if time_query is None:
return kwargs
return [{'time': time_range}.update(kwargs) for time_range in year_splitter(*time_query)]


def year_splitter(start, end):
"""
Produces a list of time ranges based that represent each year in the range.
Expand Down
17 changes: 17 additions & 0 deletions datacube_apps/ncml.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ def ncml_app():
@click.argument('app_config')
@task_app.task_app(make_config=make_ncml_config, make_tasks=make_ncml_tasks)
def full(index, config, tasks, executor, queue_size, **kwargs):
"""Create ncml files for the full time depth of the product
e.g. datacube-ncml full <app_config_yaml>
"""
click.echo('Starting datacube ncml utility...')

task_func = partial(do_ncml_task, config)
Expand All @@ -188,6 +192,13 @@ def full(index, config, tasks, executor, queue_size, **kwargs):
@click.argument('nested_years', nargs=-1, type=click.INT)
@task_app.task_app(make_config=make_ncml_config, make_tasks=make_ncml_tasks)
def nest(index, config, tasks, executor, queue_size, **kwargs):
"""Create ncml files for the full time, with nested ncml files covering the given years
e.g. datacube-ncml nest <app_config_yaml> 2016 2017
This will refer to the actual files (hopefully stacked), and make ncml files for the given (ie unstacked) years.
Use the `update` command when new data is added to a year, without having to rerun for the entire time depth.
"""
click.echo('Starting datacube ncml utility...')

task_func = partial(do_ncml_task, config)
Expand All @@ -200,6 +211,12 @@ def nest(index, config, tasks, executor, queue_size, **kwargs):
@click.argument('year', type=click.INT)
@task_app.task_app(make_config=make_ncml_config, make_tasks=make_ncml_tasks)
def update(index, config, tasks, executor, queue_size, **kwargs):
"""Update a single year ncml file
e.g datacube-ncml <app_config_yaml> 1996
This can be used to update an existing ncml file created with `nest` when new data is added.
"""
click.echo('Starting datacube ncml utility...')

task_func = partial(do_ncml_task, config)
Expand Down
51 changes: 26 additions & 25 deletions datacube_apps/stacker/fixer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
Create time-stacked NetCDF files
Finds single timeslice files that have not been stacked (based on filename), and rewrites them
This tool is used to update NetCDF metadata for files that are not picked up by the stacker
"""
from __future__ import absolute_import, print_function
Expand Down Expand Up @@ -37,9 +39,9 @@
APP_NAME = 'datacube-fixer'


def get_filename(config, cell_index, year):
def make_filename(config, cell_index, start_time):
file_path_template = str(Path(config['location'], config['file_path_template']))
return file_path_template.format(tile_index=cell_index, start_time=year, version=config['taskfile_version'])
return file_path_template.format(tile_index=cell_index, start_time=start_time, version=config['taskfile_version'])


def get_temp_file(final_output_path):
Expand Down Expand Up @@ -73,36 +75,28 @@ def get_single_dataset_paths(cell):
return files_to_fix


def make_fixer_tasks(index, config, **kwargs):
# Find tiles with bad metadata
# In this case, they will be single timeslices
# Typically for particular years: the first and last for each product
# But for 2016, nothing will be stacked
# So we could do it based on NetCDF properties, and for all single-dataset locations
# Tile -> locations -> unique locations -> tile
# Just regex the filenames

query = {kw: arg for kw, arg in kwargs.items() if kw in ['cell_index'] and arg is not None}
def make_fixer_tasks(index, config, time=None, cell_index=None, **kwargs):
"""Find datasets that have a location not shared by other datasets and make it into a task
"""
gw = datacube.api.GridWorkflow(index=index, product=config['product'].name)

time_query_list = task_app.year_splitter(*kwargs['time']) if 'time' in kwargs else [None]
for query in task_app.break_query_into_years(time):
cells = gw.list_cells(product=config['product'].name, cell_index=cell_index, **query)

for time_query in time_query_list:
cells = gw.list_cells(product=config['product'].name, time=time_query, **query)
for cell_index, cell in cells.items():
for cell_index_key, cell in cells.items():
files_to_fix = get_single_dataset_paths(cell)
if files_to_fix:
for time, tile in cell.split('time'):
source_path = tile.sources.values.item()[0].local_path
if source_path in files_to_fix:
tile = gw.update_tile_lineage(tile)
start_time = '{0:%Y%m%d%H%M%S%f}'.format(pd.Timestamp(time).to_datetime())
output_filename = get_filename(config, cell_index, start_time)
output_filename = make_filename(config, cell_index_key, start_time)
_LOG.info('Fixing required for: time=%s, cell=%s. Output=%s',
start_time, cell_index, output_filename)
start_time, cell_index_key, output_filename)
yield dict(start_time=time,
tile=tile,
cell_index=cell_index,
cell_index=cell_index_key,
output_filename=output_filename)


Expand Down Expand Up @@ -135,14 +129,18 @@ def make_fixer_config(index, config, export_path=None, **query):
return config


def get_history_attribute(config, task):
def build_history_string(config, task, keep_original=True):
tile = task['tile']
input_path = str(tile.sources[0].item()[0].local_path)
# original_dataset = xr.open_dataset(input_path)
# original_history = original_dataset.attrs.get('history', '')
original_history = 'Original file at {}'.format(input_path)
if keep_original:
original_dataset = xr.open_dataset(input_path)
original_history = original_dataset.attrs.get('history', '')
else:
original_history = 'Original file at {}'.format(input_path)

if original_history:
original_history += '\n'

new_history = '{dt} {user} {app} ({ver}) {args} # {comment}'.format(
dt=datetime.datetime.now(tz.tzlocal()).isoformat(),
user=os.environ.get('USERNAME') or os.environ.get('USER'),
Expand All @@ -166,7 +164,9 @@ def _unwrap_dataset_list(labels, dataset_list):

def do_fixer_task(config, task):
global_attributes = config['global_attributes']
global_attributes['history'] = get_history_attribute(config, task)

# Don't keep the original history if we are trying to fix it
global_attributes['history'] = build_history_string(config, task, keep_original=False)

variable_params = config['variable_params']

Expand Down Expand Up @@ -262,6 +262,7 @@ def process_result(index, result):
@task_app.task_app_options
@task_app.task_app(make_config=make_fixer_config, make_tasks=make_fixer_tasks)
def fixer(index, config, tasks, executor, queue_size, **kwargs):
"""This script rewrites unstacked dataset files to correct their NetCDF metadata."""
click.echo('Starting fixer utility...')

task_func = partial(do_fixer_task, config)
Expand Down
25 changes: 11 additions & 14 deletions datacube_apps/stacker/stacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,27 @@ def get_temp_file(final_output_path):
return tmp_path


def make_stacker_tasks(index, config, **kwargs):
product = config['product']
query = {kw: arg for kw, arg in kwargs.items() if kw in ['cell_index'] and arg is not None}
def make_stacker_tasks(index, config, cell_index=None, time=None, **kwargs):
gw = datacube.api.GridWorkflow(index=index, product=config['product'].name)

gw = datacube.api.GridWorkflow(index=index, product=product.name)

time_query_list = task_app.year_splitter(*kwargs['time']) if 'time' in kwargs else [None]

for time_query in time_query_list:
cells = gw.list_cells(product=product.name, time=time_query, **query)
for (cell_index, tile) in cells.items():
for query in task_app.break_query_into_years(time):
cells = gw.list_cells(product=config['product'].name, cell_index=cell_index, **query)
for (cell_index_key, tile) in cells.items():
for (year, year_tile) in tile.split_by_time(freq='A'):
storage_files = set(ds.local_path for ds in itertools.chain(*year_tile.sources.values))
if len(storage_files) > 1:
year_tile = gw.update_tile_lineage(year_tile)
output_filename = get_filename(config, cell_index, year)
_LOG.info('Stacking required for: year=%s, cell=%s. Output=%s', year, cell_index, output_filename)
output_filename = get_filename(config, cell_index_key, year)
_LOG.info('Stacking required for: year=%s, cell=%s. Output=%s',
year, cell_index_key, output_filename)
yield dict(year=year,
tile=year_tile,
cell_index=cell_index,
cell_index=cell_index_key,
output_filename=output_filename)
elif len(storage_files) == 1:
[only_filename] = storage_files
_LOG.info('Stacking not required for: year=%s, cell=%s. existing=%s',
year, cell_index, only_filename)
year, cell_index_key, only_filename)


def make_stacker_config(index, config, export_path=None, **query):
Expand Down Expand Up @@ -236,6 +232,7 @@ def process_result(index, result):
@task_app.task_app_options
@task_app.task_app(make_config=make_stacker_config, make_tasks=make_stacker_tasks)
def main(index, config, tasks, executor, queue_size, **kwargs):
"""This script creates NetCDF files containing an entire year of tiles in the same file."""
click.echo('Starting stacking utility...')

task_func = partial(do_stack_task, config)
Expand Down
15 changes: 9 additions & 6 deletions tests/api/test_grid_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,13 @@ def search_eager(lat=None, lon=None, **kwargs):
assert len(padded_tile[1, -2, ti].sources.values[0]) == 2


def test_cell():
""" Test GridWorkflow with padding option. """
def test_gridworkflow_with_time_depth():
"""Test GridWorkflow with time series.
Also test `Tile` methods `split` and `split_by_time`
"""
from mock import MagicMock
import datetime

# ----- fake a datacube -----
# e.g. let there be a dataset that coincides with a grid cell

fakecrs = geometry.CRS('EPSG:4326')

grid = 100 # spatial frequency in crs units
Expand All @@ -156,16 +155,20 @@ def make_fake_datasets(num_datasets):
fakeindex.datasets.get_field_names.return_value = ['time'] # permit query on time
fakeindex.datasets.search_eager.return_value = list(make_fake_datasets(100))

# ------ test without padding ----
# ------ test with time dimension ----

from datacube.api.grid_workflow import GridWorkflow
gw = GridWorkflow(fakeindex, gridspec)
query = dict(product='fake_product_name')

cells = gw.list_cells(**query)
for cell_index, cell in cells.items():

# test Tile.split()
for label, tile in cell.split('time'):
assert tile.shape == (1, 10, 10)

# test Tile.split_by_time()
for year, year_cell in cell.split_by_time(freq='A'):
for t in year_cell.sources.time.values:
assert str(t)[:4] == year

0 comments on commit 4363a14

Please sign in to comment.