Skip to content

Commit

Permalink
Merge b7c52a6 into ca72ac5
Browse files Browse the repository at this point in the history
  • Loading branch information
uchchwhash committed Feb 6, 2019
2 parents ca72ac5 + b7c52a6 commit 404c6e1
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 73 deletions.
90 changes: 51 additions & 39 deletions datacube/virtual/__init__.py
Expand Up @@ -2,6 +2,7 @@

from .impl import VirtualProduct, Transformation, VirtualProductException
from .transformations import MakeMask, ApplyMask, ToFloat, Rename, Select
from .transformations import Mean, year, month, week, day
from .utils import reject_keys

from datacube.model import Measurement
Expand All @@ -14,61 +15,52 @@
class NameResolver:
""" Apply a mapping from name to callable objects in a recipe. """

def __init__(self, **lookup_table):
def __init__(self, lookup_table):
self.lookup_table = lookup_table

def construct(self, **recipe) -> VirtualProduct:
""" Validate recipe and construct virtual product. """

get = recipe.get

kind_keys = {key for key in recipe if key in ['product', 'transform', 'collate', 'juxtapose']}
if len(kind_keys) < 1:
raise VirtualProductException("virtual product kind not specified in {}".format(recipe))
elif len(kind_keys) > 1:
raise VirtualProductException("ambiguous kind in {}".format(recipe))

if 'product' in recipe:
def resolve_func(key, value):
if key not in ['fuse_func', 'dataset_predicate']:
return value

if callable(value):
return value
def lookup(name, namespace=None, kind='transformation'):
if callable(name):
return name

if namespace is not None and namespace in self.lookup_table and name in self.lookup_table[namespace]:
result = self.lookup_table[namespace][name]
else:
try:
return import_function(value)
result = import_function(name)
except (ImportError, AttributeError):
raise VirtualProductException("could not resolve function {} in {}".format(key, recipe))
msg = "could not resolve {} {} in {}".format(kind, name, recipe)
raise VirtualProductException(msg)

return VirtualProduct({key: resolve_func(key, value) for key, value in recipe.items()})
if not callable(result):
raise VirtualProductException("{} not callable in {}".format(kind, recipe))

if 'transform' in recipe:
def resolve_transform(cls_name):
if callable(cls_name):
return cls_name
return result

if cls_name in self.lookup_table:
cls = self.lookup_table[cls_name]
else:
try:
cls = import_function(cls_name)
except (ImportError, AttributeError):
msg = "could not resolve transformation {} in {}".format(cls_name, recipe)
raise VirtualProductException(msg)

if not callable(cls):
raise VirtualProductException("transformation not callable in {}".format(recipe))
kind_keys = {key for key in recipe if key in ['product', 'transform', 'collate', 'juxtapose', 'aggregate']}
if len(kind_keys) < 1:
raise VirtualProductException("virtual product kind not specified in {}".format(recipe))
elif len(kind_keys) > 1:
raise VirtualProductException("ambiguous kind in {}".format(recipe))

return cls
if 'product' in recipe:
func_keys = ['fuse_func', 'dataset_predicate']
return VirtualProduct({key: value if key not in func_keys else lookup(value, kind='function')
for key, value in recipe.items()})

if 'transform' in recipe:
cls_name = recipe['transform']
input_product = get('input')

if input_product is None:
raise VirtualProductException("no input for transformation in {}".format(recipe))

return VirtualProduct(dict(transform=resolve_transform(cls_name), input=self.construct(**input_product),
return VirtualProduct(dict(transform=lookup(cls_name, 'transform'),
input=self.construct(**input_product),
**reject_keys(recipe, ['transform', 'input'])))

if 'collate' in recipe:
Expand All @@ -85,14 +77,34 @@ def resolve_transform(cls_name):
return VirtualProduct(dict(juxtapose=[self.construct(**child) for child in recipe['juxtapose']],
**reject_keys(recipe, ['juxtapose'])))

if 'aggregate' in recipe:
cls_name = recipe['aggregate']
input_product = get('input')
group_by = get('group_by')

if input_product is None:
raise VirtualProductException("no input for aggregate in {}".format(recipe))
if group_by is None:
raise VirtualProductException("no group_by for aggregate in {}".format(recipe))

return VirtualProduct(dict(aggregate=lookup(cls_name, 'aggregate'),
group_by=lookup(group_by, 'aggregate/group_by', kind='group_by'),
input=self.construct(**input_product),
**reject_keys(recipe, ['aggregate', 'input', 'group_by'])))

raise VirtualProductException("could not understand virtual product recipe: {}".format(recipe))


DEFAULT_RESOLVER = NameResolver(make_mask=MakeMask,
apply_mask=ApplyMask,
to_float=ToFloat,
rename=Rename,
select=Select)
DEFAULT_RESOLVER = NameResolver({'transform': dict(make_mask=MakeMask,
apply_mask=ApplyMask,
to_float=ToFloat,
rename=Rename,
select=Select),
'aggregate': dict(mean=Mean),
'aggregate/group_by': dict(year=year,
month=month,
week=week,
day=day)})


def construct(**recipe: Mapping[str, Any]) -> VirtualProduct:
Expand Down

0 comments on commit 404c6e1

Please sign in to comment.