Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates test_execute_transform #643

Merged
merged 2 commits into from Mar 7, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
136 changes: 14 additions & 122 deletions ion/processes/data/transforms/test/test_transform_prime.py
Expand Up @@ -36,125 +36,9 @@ def setUp(self):
self.data_product_management = DataProductManagementServiceClient()


def _L0_pdict(self):
contexts = {}

t_ctxt = ParameterContext('TIME', param_type=QuantityType(value_encoding=np.dtype('int64')))
t_ctxt.uom = 'seconds since 01-01-1900'
t_ctxt_id = self.dataset_management.create_parameter_context(name='test_TIME', parameter_context=t_ctxt.dump())
self.addCleanup(self.dataset_management.delete_parameter_context, t_ctxt_id)
contexts['TIME'] = t_ctxt_id

lat_ctxt = ParameterContext('LAT', param_type=ConstantType(QuantityType(value_encoding=np.dtype('float32'))), fill_value=-9999)
lat_ctxt.axis = AxisTypeEnum.LAT
lat_ctxt.uom = 'degree_north'
lat_ctxt_id = self.dataset_management.create_parameter_context(name='test_LAT', parameter_context=lat_ctxt.dump())
self.addCleanup(self.dataset_management.delete_parameter_context, lat_ctxt_id)
contexts['LAT'] = lat_ctxt_id

lon_ctxt = ParameterContext('LON', param_type=ConstantType(QuantityType(value_encoding=np.dtype('float32'))), fill_value=-9999)
lon_ctxt.axis = AxisTypeEnum.LON
lon_ctxt.uom = 'degree_east'
lon_ctxt_id = self.dataset_management.create_parameter_context(name='test_LON', parameter_context=lon_ctxt.dump())
self.addCleanup(self.dataset_management.delete_parameter_context, lon_ctxt_id)
contexts['LON'] = lon_ctxt_id

# Independent Parameters

# Temperature - values expected to be the decimal results of conversion from hex
temp_ctxt = ParameterContext('TEMPWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999)
temp_ctxt.uom = 'deg_C'
temp_ctxt_id = self.dataset_management.create_parameter_context(name='test_TEMPWAT_L0', parameter_context=temp_ctxt.dump())
self.addCleanup(self.dataset_management.delete_parameter_context, temp_ctxt_id)
contexts['TEMPWAT_L0'] = temp_ctxt_id

# Conductivity - values expected to be the decimal results of conversion from hex
cond_ctxt = ParameterContext('CONDWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999)
cond_ctxt.uom = 'S m-1'
cond_ctxt_id = self.dataset_management.create_parameter_context(name='test_CONDWAT_L0', parameter_context=cond_ctxt.dump())
self.addCleanup(self.dataset_management.delete_parameter_context, cond_ctxt_id)
contexts['CONDWAT_L0'] = cond_ctxt_id

# Pressure - values expected to be the decimal results of conversion from hex
press_ctxt = ParameterContext('PRESWAT_L0', param_type=QuantityType(value_encoding=np.dtype('float32')), fill_value=-9999)
press_ctxt.uom = 'dbar'
press_ctxt_id = self.dataset_management.create_parameter_context(name='test_PRESWAT_L0', parameter_context=press_ctxt.dump())
self.addCleanup(self.dataset_management.delete_parameter_context, press_ctxt_id)
contexts['PRESWAT_L0'] = press_ctxt_id

context_ids = contexts.values()

pdict_id = self.dataset_management.create_parameter_dictionary('L0 SBE37', parameter_context_ids=context_ids, temporal_context='TIME')
self.addCleanup(self.dataset_management.delete_parameter_dictionary, pdict_id)

return pdict_id


def _L1_pdict(self):
contexts = {}
funcs = {}
# Dependent Parameters
t_ctxt = ParameterContext('TIME', param_type=QuantityType(value_encoding=np.dtype('int64')))
t_ctxt.uom = 'seconds since 01-01-1900'
t_ctxt_id = self.dataset_management.create_parameter_context(name='test_TIME', parameter_context=t_ctxt.dump())
contexts['TIME'] = t_ctxt_id

# TEMPWAT_L1 = (TEMPWAT_L0 / 10000) - 10
tl1_func = '(T / 10000) - 10'
expr = NumexprFunction('TEMPWAT_L1', tl1_func, ['T'])
expr_id = self.dataset_management.create_parameter_function(name='test_TEMPWAT_L1', parameter_function=expr.dump())
self.addCleanup(self.dataset_management.delete_parameter_function, expr_id)
funcs['TEMPWAT_L1'] = expr, expr_id

tl1_pmap = {'T': 'TEMPWAT_L0'}
expr.param_map = tl1_pmap
tempL1_ctxt = ParameterContext('TEMPWAT_L1', param_type=ParameterFunctionType(function=expr), variability=VariabilityEnum.TEMPORAL)
tempL1_ctxt.uom = 'deg_C'
tempL1_ctxt_id = self.dataset_management.create_parameter_context(name='test_TEMPWAT_L1', parameter_context=tempL1_ctxt.dump(), parameter_function_ids=[expr_id])
self.addCleanup(self.dataset_management.delete_parameter_context, tempL1_ctxt_id)
contexts['TEMPWAT_L1'] = tempL1_ctxt_id

# CONDWAT_L1 = (CONDWAT_L0 / 100000) - 0.5
cl1_func = '(C / 100000) - 0.5'
expr = NumexprFunction('CONDWAT_L1', cl1_func, ['C'])
expr_id = self.dataset_management.create_parameter_function(name='test_CONDWAT_L1', parameter_function=expr.dump())
self.addCleanup(self.dataset_management.delete_parameter_function, expr_id)
funcs['CONDWAT_L1'] = expr, expr_id

cl1_pmap = {'C': 'CONDWAT_L0'}
expr.param_map = cl1_pmap
condL1_ctxt = ParameterContext('CONDWAT_L1', param_type=ParameterFunctionType(function=expr), variability=VariabilityEnum.TEMPORAL)
condL1_ctxt.uom = 'S m-1'
condL1_ctxt_id = self.dataset_management.create_parameter_context(name='test_CONDWAT_L1', parameter_context=condL1_ctxt.dump(), parameter_function_ids=[expr_id])
self.addCleanup(self.dataset_management.delete_parameter_context, condL1_ctxt_id)
contexts['CONDWAT_L1'] = condL1_ctxt_id

# Equation uses p_range, which is a calibration coefficient - Fixing to 679.34040721
# PRESWAT_L1 = (PRESWAT_L0 * p_range / (0.85 * 65536)) - (0.05 * p_range)
pl1_func = '(P * p_range / (0.85 * 65536)) - (0.05 * p_range)'
expr = NumexprFunction('PRESWAT_L1', pl1_func, ['P', 'p_range'])
expr_id = self.dataset_management.create_parameter_function(name='test_PRESWAT_L1', parameter_function=expr.dump())
self.addCleanup(self.dataset_management.delete_parameter_function, expr_id)
funcs['PRESWAT_L1'] = expr, expr_id

pl1_pmap = {'P': 'PRESWAT_L0', 'p_range': 679.34040721}
expr.param_map = pl1_pmap
presL1_ctxt = ParameterContext('PRESWAT_L1', param_type=ParameterFunctionType(function=expr), variability=VariabilityEnum.TEMPORAL)
presL1_ctxt.uom = 'S m-1'
presL1_ctxt_id = self.dataset_management.create_parameter_context(name='test_CONDWAT_L1', parameter_context=presL1_ctxt.dump(), parameter_function_ids=[expr_id])
self.addCleanup(self.dataset_management.delete_parameter_context, presL1_ctxt_id)
contexts['PRESWAT_L1'] = presL1_ctxt_id
context_ids = contexts.values()

pdict_id = self.dataset_management.create_parameter_dictionary('L1 SBE37', parameter_context_ids=context_ids, temporal_context='TIME')
self.addCleanup(self.dataset_management.delete_parameter_dictionary, pdict_id)

return pdict_id


def setup_streams(self):
in_pdict_id = self._L0_pdict()
out_pdict_id = self._L1_pdict()
in_pdict_id = self.dataset_management.read_parameter_dictionary_by_name('sbe37_L0_test', id_only=True)
out_pdict_id = self.dataset_management.read_parameter_dictionary_by_name('sbe37_L1_test', id_only=True)

in_stream_def_id = self.pubsub_management.create_stream_definition('L0 SBE37', parameter_dictionary_id=in_pdict_id)
self.addCleanup(self.pubsub_management.delete_stream_definition, in_stream_def_id)
Expand All @@ -168,13 +52,21 @@ def setup_streams(self):

return [(in_stream_id, in_stream_def_id), (out_stream_id, out_stream_def_id)]

def preload(self):
config = DotDict()
config.op = 'load'
config.scenario = 'BASE,LC_TEST'
config.categories = 'ParameterFunctions,ParameterDefs,ParameterDictionary'
config.path = 'res/preload/r2_ioc'

self.container.spawn_process('preload','ion.processes.bootstrap.ion_loader','IONLoader', config)

@attr('LOCOINT')
@unittest.skipIf(os.getenv('CEI_LAUNCH_TEST', False), 'Skip test while in CEI LAUNCH mode')
def test_execute_transform(self):
self.preload()
queue_name = 'transform_prime'

tp = TransformPrime()
stream_info = self.setup_streams()
in_stream_id, in_stream_def_id = stream_info[0]
out_stream_id, out_stream_def_id = stream_info[1]
Expand Down Expand Up @@ -221,13 +113,13 @@ def validator(msg, route, stream_id):
publisher = StandaloneStreamPublisher(in_stream_id, in_route)

outbound_rdt = RecordDictionaryTool(stream_definition_id=in_stream_def_id)
outbound_rdt['TIME'] = [0]
outbound_rdt['time'] = [0]
outbound_rdt['TEMPWAT_L0'] = [280000]
outbound_rdt['CONDWAT_L0'] = [100000]
outbound_rdt['PRESWAT_L0'] = [2789]

outbound_rdt['LAT'] = [45]
outbound_rdt['LON'] = [-71]
outbound_rdt['lat'] = [45]
outbound_rdt['lon'] = [-71]

outbound_granule = outbound_rdt.to_granule()

Expand Down
4 changes: 1 addition & 3 deletions ion/services/dm/distribution/pubsub_management_service.py
Expand Up @@ -87,9 +87,7 @@ def compare_stream_definition(self, stream_definition1_id='', stream_definition2
def compatible_stream_definitions(self, in_stream_definition_id, out_stream_definition_id):
if in_stream_definition_id == out_stream_definition_id and self.read_stream_definition(in_stream_definition_id):
return True
def1 = self.read_stream_definition(in_stream_definition_id)
def2 = self.read_stream_definition(out_stream_definition_id)
return self._compare_pdicts(def1.parameter_dictionary, def2.parameter_dictionary)
return self.validate_stream_defs(in_stream_definition_id, out_stream_definition_id)

def validate_stream_defs(self, in_stream_definition_id, out_stream_definition_id):
stream_def_in = self.read_stream_definition(in_stream_definition_id)
Expand Down
2 changes: 1 addition & 1 deletion ion/services/dm/inventory/dataset_management_service.py
Expand Up @@ -252,7 +252,7 @@ def read_parameter_contexts(self, parameter_dictionary_id='', id_only=False):
def read_parameter_dictionary_by_name(self, name='', id_only=False):
res, _ = self.clients.resource_registry.find_resources(restype=RT.ParameterDictionary, name=name, id_only=id_only)
if not len(res):
raise NotFound('Unable to locate context with name: %s' % name)
raise NotFound('Unable to locate dictionary with name: %s' % name)
return res[0]

#--------
Expand Down
19 changes: 9 additions & 10 deletions ion/services/sa/process/data_process_management_service.py
Expand Up @@ -824,17 +824,16 @@ def _launch_data_process(self, queue_name='', data_process_definition_id='', out
return self._launch_process(queue_name, out_streams, process_definition_id, configuration)







def _validator(self, in_data_product_id, out_data_product_id):
in_stream = self._get_stream_from_dp(dp_id=in_data_product_id)
in_stream_def = self.clients.resource_registry.find_objects(subject=in_stream._id, predicate=PRED.hasStreamDefinition, id_only=False)
out_stream = self._get_stream_from_dp(dp_id=out_data_product_id)
out_stream_def = self.clients.resource_registry.find_objects(subject=out_stream._id, predicate=PRED.hasStreamDefinition, id_only=False)
return self.clients.pubsub_management.compatible_stream_definitions(in_stream_definition_id=in_stream_def, out_stream_definition_id=out_stream_def)
in_stream_id = self._get_stream_from_dp(dp_id=in_data_product_id)
in_stream_defs, _ = self.clients.resource_registry.find_objects(subject=in_stream_id, predicate=PRED.hasStreamDefinition, id_only=True)
if not len(in_stream_defs):
raise BadRequest('No valid stream definition defined for data product stream')
out_stream_id = self._get_stream_from_dp(dp_id=out_data_product_id)
out_stream_defs, _ = self.clients.resource_registry.find_objects(subject=out_stream_id, predicate=PRED.hasStreamDefinition, id_only=True)
if not len(out_stream_defs):
raise BadRequest('No valid stream definition defined for data product stream')
return self.clients.pubsub_management.compatible_stream_definitions(in_stream_definition_id=in_stream_defs[0], out_stream_definition_id=out_stream_defs[0])

def validate_compatibility(self, in_data_product_ids=None, out_data_product_ids=None, routes=None):
'''
Expand Down