Skip to content
Browse files

Data product preload update for derived science data products

  • Loading branch information...
1 parent f73182c commit e0255248c5c1ac18c4c300c4fdc13ce8e9a133ba Michael Meisinger committed Nov 21, 2013
View
25 README_DEMO
@@ -20,15 +20,16 @@ Prepare for start:
Start the services container with Container UI:
> bin/pycc -fc -r res/deploy/r2deploy.yml --mx
-Preload ALPHA setup (with UI, OOI assets, demo resources); this can take many minutes to complete before executing the next cmd:
+Preload ALPHA setup (with UI, OOI assets, demo resources); this can take many minutes to complete. Add path=master
+to run against the master Google spreadsheet:
> bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader cfg=res/preload/r2_ioc/config/ooi_alpha.yml
Start the UI (in ion-ux, virtualenv prepared as documented):
> python main.py
Demo steps (using ALPHA preload):
-===========================================
+=================================
Go to "CI Bench Test Facility", choose "Platforms", select "Low Power JBox - RSN Bench Testing"
- From commands menu, select command
@@ -42,14 +43,11 @@ Go to "CI Bench Test Facility", choose "Instruments", select "CTD Simulator"
Individual preload commands:
============================
-Preload system UI only
+Preload system UI definitions
> bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=loadui ui_path='http://userexperience.oceanobservatories.org/database-exports/Candidates'
-Preload system resources (base setup, without OOI resources or UI)
-> bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load scenario=BETA attachments=res/preload/r2_ioc/attachments
-
-Preload system resources (base demo setup, without OOI resources or UI)
-> bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load scenario=BETA,R2_DEMO attachments=res/preload/r2_ioc/attachments
+Preload quick demo/test system resources (without OOI resources or UI)
+> bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load scenario=BETA
Preload OOI assets (bulk mode)
> bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load assets=res/preload/r2_ioc/ooi_assets loadooi=True bulk=True
@@ -112,3 +110,14 @@ Datastore work:
Dump the contents of the resource registry as XLSX file in ./interface/resources_<timestamp>.
> bin/pycc -x ion.processes.bootstrap.datastore_loader.DatastoreLoader op=dumpres
+
+
+Testing with OOI Data:
+======================
+
+- Create directory /tmp/dsatest
+- Copy some *.mrg files from test_data/global_glider into /tmp/dsatest
+- Edit the External Dataset Agent Instance resource and set driver_config to:
+ {"startup_config":{"parser":{},"harvester":{"directory":"/tmp/dsatest","pattern":"unit*.mrg"}},"max_records":50}
+- Activate persistence for parsed data product (in stack menu)
+- Start instrument (data) agent and execute INITIALIZE, GO_ACTIVE, RUN, AUTOSAMPLE
View
120 ion/processes/bootstrap/ion_loader.py
@@ -5,13 +5,13 @@
@see https://github.com/ooici/coi-services/blob/master/README_DEMO
Examples (see also README_DEMO linked above):
- bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=master scenario=R2_DEMO
- bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc/R2PreloadedResources.xlsx scenario=R2_DEMO
- bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path="https://docs.google.com/spreadsheet/pub?key=0AttCeOvLP6XMdG82NHZfSEJJOGdQTkgzb05aRjkzMEE&output=xls" scenario=R2_DEMO
- bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=R2_DEMO
+ bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=master scenario=BETA
+ bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc/R2PreloadedResources.xlsx scenario=BETA
+ bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path="https://docs.google.com/spreadsheet/pub?key=0AttCeOvLP6XMdG82NHZfSEJJOGdQTkgzb05aRjkzMEE&output=xls" scenario=BETA
+ bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=BETA
bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=loadui path=res/preload/r2_ioc
- bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=loadui path=https://userexperience.oceanobservatories.org/database-exports/
+ bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=loadui ui_path=https://userexperience.oceanobservatories.org/database-exports/
bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=master assets=res/preload/r2_ioc/ooi_assets scenario=R2_DEMO loadooi=True
bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=R2_DEMO loadooi=True assets=res/preload/r2_ioc/ooi_assets
@@ -25,25 +25,26 @@
Options:
cfg= Path to a preload config file that allows scripted preload runs with defined params
op= the basic operation to execute (e.g. load, loadui, parseui, deleteooi)
- bulk= if True, uses RR bulk insert operations to load, not service calls
- debug= if True, allows a few shortcuts to perform faster loads
path= override location (dir, GoogleDoc or XLSX file) for preload rows (default is TESTED_DOC; "master" is recognized)
- attachments= override location to get file attachments (default is path)
+ attachments= override location to get file attachments (default is path + '/attachments')
ui_path= override location to get UI preload files (default is path + '/ui_assets')
assets= override location to get OOI asset file (default is path + '/ooi_assets')
+ assetmappings= override location for OOI mapping spreadsheet (default is GoogleDoc)
categories= list of categories to import
excludecategories= list of categories to NOT import
clearcols= list of column names to clear (set to empty string) before preloading
- loadooi= if True (default is False) loads resources based on OOI assets and ooiuntil argument
loadui= if True (default is False) loads the UI spec
+ loadooi= if True (default is False) loads resources based on OOI assets and ooiuntil argument
parseooi= if True (default is False) reads and parses OOI asset information
- idmap= if True, the IDMap category is used to substitute preload ids (used in certain OOI preload runs)
- assetmappings= override location for OOI mapping spreadsheet (default is GoogleDoc)
+
+ idmap= if True, the IDMap category is used to substitute preload ids
ooifilter= one or comma separated list of CE,CP,GA,GI,GP,GS,ES to limit ooi resource import
- ooiexclude= synonymous to excludecategories. Don't use
ooiuntil= datetime of latest planned deployment date to consider for data product etc import mm/dd/yyyy
ooiparams= if True (default is False) create links to OOI parameter definitions
ooipartial= if True (default is False) creates resources (data products etc) even if not all inputs are there
+
+ debug= if True, allows shortcuts to perform faster loads (where possible)
+ bulk= if True, uses RR bulk insert operations to load, not service calls
exportui= if True, writes interface/ui_specs.json with UI object
revert= if True (and debug==True) remove all new resources and associations created if preload fails
@@ -116,12 +117,11 @@
### the URL below should point to a COPY of the master google spreadsheet that works with this version of the loader
TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AgjFgozf2vG6dDM4MHNOZjd1eHV3Z3JqbmdfTEJzLXc&output=xls"
-
-#
### while working on changes to the google doc, use this to run test_loader.py against the master spreadsheet
#TESTED_DOC=MASTER_DOC
DEFAULT_ASSETS_PATH = "res/preload/r2_ioc/ooi_assets"
+DEFAULT_ATTACHMENTS_PATH = "res/preload/r2_ioc/attachments"
# URL of the mapping spreadsheet for OOI assets
OOI_MAPPING_DOC = "https://docs.google.com/spreadsheet/pub?key=0AttCeOvLP6XMdFVUeDdoUTU0b0NFQ1dCVDhuUjY0THc&output=xls"
@@ -266,7 +266,10 @@ def _do_preload(self, config):
self.path = config.get("path", None) or TESTED_DOC # handle case where path is explicitly set to None
if self.path=='master':
self.path = MASTER_DOC
- self.attachment_path = config.get("attachments", self.path + '/attachments')
+ self.attachment_path = config.get("attachments", None)
+ if not self.attachment_path:
+ self.attachment_path = DEFAULT_ATTACHMENTS_PATH if self.path.startswith('http') or self.path.endswith('xlsx') else self.path + '/attachments'
+
self.asset_path = config.get("assets", None)
if not self.asset_path:
self.asset_path = DEFAULT_ASSETS_PATH if self.path.startswith('http') or self.path.endswith('xlsx') else self.path + "/ooi_assets"
@@ -2807,7 +2810,7 @@ def _load_ExternalDatasetAgentInstance(self, row):
svc_client.assign_external_dataset_agent_instance_to_device(edai_id, device_id, headers=headers)
def _load_ExternalDatasetAgentInstance_OOI(self):
- # TBD create for dataset agent instruments
+ # @see _load_InstrumentAgentInstance_OOI
pass
# -------------------------------------------------------------------------
@@ -2879,8 +2882,11 @@ def _load_DataProduct(self, row, do_bulk=False):
if gcrs_id:
res_obj.geospatial_coordinate_reference_system = self.resource_ids[gcrs_id]
parent_id = None
- if row['parent'] and row['parent'] in self.resource_ids:
- parent_id = self.resource_ids[row['parent']]
+ if row['parent']:
+ if row['parent'] in self.resource_ids:
+ parent_id = self.resource_ids[row['parent']]
+ else:
+ log.warn("DataProduct %s parent reference %s not found", row[COL_ID], parent_id)
res_obj.spatial_domain = sdom.dump()
res_obj.temporal_domain = tdom.dump()
@@ -2897,19 +2903,20 @@ def _load_DataProduct(self, row, do_bulk=False):
# Create and associate Stream
# Create and associate Dataset
else:
- svc_client = self._get_service_client("data_product_management")
+ dpms_client = self._get_service_client("data_product_management")
stream_definition_id = self.resource_ids[row["stream_def_id"]] if row["stream_def_id"] else None
if stream_definition_id:
- res_id = svc_client.create_data_product(data_product=res_obj, stream_definition_id=stream_definition_id,
- parent_data_product_id=parent_id,
- headers=headers)
+ res_id = dpms_client.create_data_product(data_product=res_obj,
+ stream_definition_id=stream_definition_id,
+ parent_data_product_id=parent_id,
+ headers=headers)
else:
- res_id = svc_client.create_data_product_(data_product=res_obj,
- headers=headers)
+ res_id = dpms_client.create_data_product_(data_product=res_obj,
+ headers=headers)
self._register_id(row[COL_ID], res_id, res_obj)
if not self.debug and get_typed_value(row['persist_data'], targettype="bool"):
- svc_client.activate_data_product_persistence(res_id, headers=headers)
+ dpms_client.activate_data_product_persistence(res_id, headers=headers)
self._resource_assign_org(row, res_id)
self._resource_advance_lcs(row, res_id)
@@ -3057,7 +3064,7 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
if num_dp_generated:
log.debug(" ...generated %s data products", num_dp_generated)
- # II. Instrument data products (raw, parsed, engineering, science L0, L1, L2)
+ # II. Instrument data products (raw, parsed, engineering, derived science L0, L1, L2)
for inst_id, inst_obj in inst_objs.iteritems():
num_dp_generated = 0
ooi_rd = OOIReferenceDesignator(inst_id)
@@ -3078,8 +3085,11 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
iagent_res_obj = self._get_resource_obj("IA_" + ia_code, True) if ia_code else None
dart_code = series_obj["dart_code"]
dagent_res_obj = self._get_resource_obj(dart_code, True) if dart_code else None
+ ia_enabled = iagent_res_obj and series_obj.get("ia_exists", False) and instagent_objs[series_obj["ia_code"]]["active"]
+ dart_enabled = dagent_res_obj and series_obj.get("dart_exists", False)
- parsed_pdict_id = ""
+ parsed_pdict_id, parsed_id = "", ""
+ # (1) Generate stream DataProducts (raw, parsed, engineering)
if iagent_res_obj or dagent_res_obj:
log.debug("Generating DataProducts for %s from instrument/data agent %s streams and SAF", inst_id,
ia_code if iagent_res_obj else dart_code)
@@ -3100,6 +3110,7 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
newrow['dp/ooi_product_name'] = ""
newrow['dp/processing_level_code'] = "Parsed"
parsed_pdict_id = pdict_by_name[scfg.parameter_dictionary_name]
+ parsed_id = dp_id
else:
if scfg.stream_type == StreamConfigurationType.PARSED:
log.warn("Instrument %s (agent %s) has more than one PARSED stream: %s (first pdict id=%s)",
@@ -3111,21 +3122,17 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
newrow['contact_ids'] = ''
newrow['geo_constraint_id'] = const_id1
newrow['coordinate_system_id'] = 'OOI_SUBMERGED_CS'
- newrow['persist_data'] = 'False' # TODO: This may need be True
+ newrow['persist_data'] = 'False' # Set persist_data to false - no ingestion worker
+ newrow['parent'] = ''
+ newrow['lcstate'] = "DEPLOYED_AVAILABLE"
pdict_id = pdict_by_name[scfg.parameter_dictionary_name]
strdef_id = self._create_dp_stream_def(inst_id, pdict_id, scfg.stream_name)
- ia_enabled = iagent_res_obj and series_obj.get("ia_exists", False) and instagent_objs[series_obj["ia_code"]]["active"]
- dart_enabled = dagent_res_obj and series_obj.get("dart_exists", False)
- if ia_enabled or dart_enabled:
- newrow['stream_def_id'] = strdef_id
- newrow['parent'] = ''
- newrow['lcstate'] = "DEPLOYED_AVAILABLE"
- else:
- if ia_enabled:
- log.warn("INCONSISTENCY. Should have StreamDefinition for ParamDict %s", scfg.parameter_dictionary_name)
- newrow['stream_def_id'] = ''
- newrow['parent'] = ''
+ newrow['stream_def_id'] = strdef_id
+
+ if not (ia_enabled or dart_enabled):
+ log.debug("No data product for %s:%s - agent not enabled", inst_id, scfg.stream_name)
+ continue
if not self._resource_exists(dp_id):
self._load_DataProduct(newrow)
@@ -3138,7 +3145,7 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
log.debug("Generating DataProducts for %s using SAF and defaults (no streams)", inst_id)
# There is no agent defined. Just create basic raw and parsed data products
- # (0) Device Data Product - raw
+ # Raw instrument DataProduct
newrow = {}
dp_id = inst_id + "_DPI0"
newrow[COL_ID] = dp_id
@@ -3159,7 +3166,7 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
create_dp_link(dp_id, inst_id + "_ID", 'InstrumentDevice')
create_dp_link(dp_id, inst_id)
- # (1) Device Data Product - parsed
+ # Parsed instrument DataProduct
newrow = {}
dp_id = inst_id + "_DPI1"
newrow[COL_ID] = dp_id
@@ -3182,11 +3189,11 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
create_dp_link(dp_id, inst_id)
else:
- # There is no agent defined. Wait generating DataProducts
+ # There is no agent defined. Defer generating DataProducts to incremental run
#log.debug("Not generating DataProducts for %s - no agent/streams defined", inst_id)
pass
- # (3*) Data Product DPS - Level (per site)
+ # (2) Generate derived DataProducts for L0/L1/L2 based on SAF DPS - Level (per site)
data_product_list = inst_obj.get('data_product_list', [])
for dptype_id in data_product_list:
dp_id = inst_id + "_" + dptype_id + "_DPID"
@@ -3255,18 +3262,39 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
newrow['contact_ids'] = ''
newrow['geo_constraint_id'] = const_id1
newrow['coordinate_system_id'] = 'OOI_SUBMERGED_CS'
- newrow['parent'] = inst_id + "_DPI1"
+ newrow['parent'] = parsed_id
newrow['persist_data'] = 'False'
parsed_pdict_obj = self._get_resource_obj(parsed_pdict_id, True)
if parsed_pdict_obj:
# Find all parameters based on the parsed param dict that belong this this DP (prefix)
param_list = ["PD7"]
+ found_dptype_match = False
params = pdict_map[parsed_pdict_obj.name]
for param in params:
param_obj = self._get_resource_obj(param)
- if param_obj.ooi_short_name.startswith(dp_obj['code']): # TODO: What about the level ambiguity?
+ if param_obj.ooi_short_name == dptype_id:
+ # Param match: SAF DPS+level == existing parameter name
+ param_list.append(param)
+ found_dptype_match = True
+ elif param_obj.ooi_short_name.startswith(dp_obj['code']):
+ # Param prefix match: SAF DPS == existing parameter name
+ # CAUTION: Preload spreadsheet ParamDef "Data Product Identifier" column contains
+ # non-compliant values e.g. VELPROF-VLN_L0 or VELPROF-PCG.
param_list.append(param)
+ if param_obj.ooi_short_name.endswith(dp_obj['level']):
+ # Param match: SAF DPS+level == existing parameter name plus extension
+ # e.g. VELPROF-VLN_L0. This means this DPS has more than 1 value. All OK
+ found_dptype_match = True
+ else:
+ # The prefix is the DPS code but level is ambiguous
+ # Param DPS match: SAF DPS == existing parameter name
+ # WARNING: Level is ambiguous. Accept for all levels
+ pass
+
+ if len(param_list) <= 1 or not found_dptype_match:
+ log.debug(" skip DataProduct %s : %s. In SAF but not found in parsed PDICT", inst_id, dptype_id)
+ continue
av_fields = ",".join(self._get_resource_obj(pid).name for pid in param_list)
strdef_id = self._create_dp_stream_def(inst_id, parsed_pdict_id, dptype_id, av_fields)
@@ -3289,7 +3317,7 @@ def create_dp_link(dp_id, source_id="", res_type="", do_bulk=self.bulk):
create_dp_link(dp_id, inst_id)
else:
- pass # Ignore this derived data product in case we don't have stream info
+ pass # Ignore this derived data product because we don't have parsed param dict
if num_dp_generated:
log.debug(" ...generated %s data products", num_dp_generated)
View
20 ion/services/sa/acquisition/data_acquisition_management_service.py
@@ -25,6 +25,7 @@
from interface.objects import ProcessDefinition, ProcessSchedule, ProcessTarget, ProcessRestartMode
from interface.objects import Parser, DataProducer, InstrumentProducerContext, ExtDatasetProducerContext, DataProcessProducerContext
from interface.objects import AttachmentType
+from interface.services.sa.idata_product_management_service import DataProductManagementServiceProcessClient
from interface.services.sa.idata_acquisition_management_service import BaseDataAcquisitionManagementService
@@ -34,6 +35,7 @@ class DataAcquisitionManagementService(BaseDataAcquisitionManagementService):
def on_init(self):
self.RR2 = EnhancedResourceRegistryClient(self.clients.resource_registry)
+ self.DPMS = DataProductManagementServiceProcessClient(self) # TODO: Add to clients
# -----------------
# The following operations register different types of data producers
@@ -767,6 +769,21 @@ def assign_external_dataset_agent_instance_to_device(self, external_dataset_agen
def unassign_external_dataset_agent_instance_from_device(self, external_dataset_agent_instance_id='', device_id=''):
self.clients.resource_registry.delete_association((device_id, PRED.hasAgentInstance, external_dataset_agent_instance_id))
+ def _assert_persistence_on(self, config_builder):
+ if not config_builder or RT.DataProduct not in config_builder.associated_objects:
+ return
+ data_products = config_builder.associated_objects[RT.DataProduct]
+ parsed_dp_id = None
+ for dp in data_products:
+ if dp.processing_level_code == "Parsed":
+ parsed_dp_id = dp._id
+ break
+ if parsed_dp_id:
+ if not self.DPMS.is_persisted(parsed_dp_id):
+ raise BadRequest("Cannot start agent - data product persistence is not activated!")
+ else:
+ log.warn("Cannot determine if persistence is activated for agent instance=%s", config_builder.agent_instance_obj._id)
+
def start_external_dataset_agent_instance(self, external_dataset_agent_instance_id=''):
"""Launch an external dataset agent instance process and return its process id.
Agent instance resource must exist and be associated with an external dataset or device and an agent definition
@@ -798,6 +815,9 @@ def start_external_dataset_agent_instance(self, external_dataset_agent_instance_
log.error('failed to launch', exc_info=True)
raise ServerError('failed to launch')
+ # Check that persistence is on
+ self._assert_persistence_on(config_builder)
+
# Save the config into an object in the object store which will be passed to the agent by the container.
config_builder.record_launch_parameters(config)
View
1 ion/services/sa/instrument/agent_configuration_builder.py
@@ -555,6 +555,7 @@ def _collect_agent_instance_associations(self):
#retrieve the output products
data_product_objs = self.RR2.find_objects(device_id, PRED.hasOutputProduct, RT.DataProduct, id_only=False)
+ ret[RT.DataProduct] = data_product_objs
if not data_product_objs:
raise NotFound("No output Data Products attached to this Device " + str(device_id))
View
17 ion/services/sa/instrument/instrument_management_service.py
@@ -343,6 +343,20 @@ def record_instrument_producer_activation(self, instrument_device_id, instrument
raise
self.RR2.update(producer_obj)
+ def _assert_persistence_on(self, config_builder):
+ if not config_builder or RT.DataProduct not in config_builder.associated_objects:
+ return
+ data_products = config_builder.associated_objects[RT.DataProduct]
+ parsed_dp_id = None
+ for dp in data_products:
+ if dp.processing_level_code == "Parsed":
+ parsed_dp_id = dp._id
+ break
+ if parsed_dp_id:
+ if not self.DPMS.is_persisted(parsed_dp_id):
+ raise BadRequest("Cannot start agent - data product persistence is not activated!")
+ else:
+ log.warn("Cannot determine if persistence is activated for agent instance=%s", config_builder.agent_instance_obj._id)
def start_instrument_agent_instance(self, instrument_agent_instance_id=''):
"""
@@ -380,6 +394,9 @@ def start_instrument_agent_instance(self, instrument_agent_instance_id=''):
log.error('failed to launch', exc_info=True)
raise ServerError('failed to launch')
+ # Check that persistence is on
+ self._assert_persistence_on(config_builder)
+
# Save the config into an object in the object store which will be passed to the agent by the container.
config_builder.record_launch_parameters(config)
View
50 ion/services/sa/product/data_product_management_service.py
@@ -67,13 +67,15 @@ def create_data_product(self, data_product=None, stream_definition_id='', exchan
"""
data_product_id = self.create_data_product_(data_product)
+ # WARNING: This creates a Stream as a side effect!!
self.assign_stream_definition_to_data_product(data_product_id=data_product_id,
stream_definition_id=stream_definition_id,
exchange_point=exchange_point)
if dataset_id and parent_data_product_id:
raise BadRequest('A parent dataset or parent data product can be specified, not both.')
if dataset_id and not data_product_id:
+ # TODO: Q: How can this ever be true?
self.assign_dataset_to_data_product(data_product_id=data_product_id, dataset_id=dataset_id)
if parent_data_product_id and not dataset_id:
self.assign_data_product_to_data_product(data_product_id=data_product_id, parent_data_product_id=parent_data_product_id)
@@ -106,9 +108,10 @@ def assign_stream_definition_to_data_product(self, data_product_id='', stream_de
validate_is_not_none(data_product_id, 'A data product id must be passed to register a data product')
validate_is_not_none(stream_definition_id, 'A stream definition id must be passed to assign to a data product')
- stream_def_obj = self.clients.pubsub_management.read_stream_definition(stream_definition_id) # Validates and checks for param_dict
+
+ stream_def_obj = self.clients.pubsub_management.read_stream_definition(stream_definition_id) # Validates and checks for param_dict
parameter_dictionary = stream_def_obj.parameter_dictionary
- validate_is_not_none(parameter_dictionary , 'A parameter dictionary must be passed to register a data product')
+ validate_is_not_none(parameter_dictionary, 'A parameter dictionary must be passed to register a data product')
exchange_point = exchange_point or 'science_data'
data_product = self.RR2.read(data_product_id)
@@ -228,9 +231,9 @@ def activate_data_product_persistence(self, data_product_id=''):
@param data_product_id str
@throws NotFound object with specified id does not exist
"""
- #--------------------------------------------------------------------------------
- # retrieve the data_process object
- #--------------------------------------------------------------------------------
+ #-----------------------------------------------------------------------------------------
+ # Step 1: Collect related resources
+
data_product_obj = self.RR2.read(data_product_id)
validate_is_not_none(data_product_obj, "The data product id should correspond to a valid registered data product.")
@@ -245,20 +248,24 @@ def activate_data_product_persistence(self, data_product_id=''):
raise BadRequest("Data Product stream is without a stream definition")
stream_def_id = stream_defs[0]
-
- stream_def = self.clients.pubsub_management.read_stream_definition(stream_def_id) # additional read necessary to fill in the pdict
+ stream_def = self.clients.pubsub_management.read_stream_definition(stream_def_id) # additional read necessary to fill in the pdict
parent_data_product_ids, _ = self.clients.resource_registry.find_objects(data_product_id, predicate=PRED.hasDataProductParent, id_only=True)
- dataset_ids = []
- if len(parent_data_product_ids)==1: # This is a child data product
+ if len(parent_data_product_ids) == 1: # This is a child data product
raise BadRequest("Child Data Products shouldn't be activated")
- else:
- dataset_ids, _ = self.clients.resource_registry.find_objects(data_product_id, predicate=PRED.hasDataset, id_only=True)
+
+ child_data_product_ids, _ = self.clients.resource_registry.find_subjects(object=data_product_id, predicate=PRED.hasDataProductParent, id_only=True)
temporal_domain, spatial_domain = time_series_domain()
temporal_domain = temporal_domain.dump()
spatial_domain = spatial_domain.dump()
-
+
+ dataset_ids, _ = self.clients.resource_registry.find_objects(data_product_id, predicate=PRED.hasDataset, id_only=True)
+
+ #-----------------------------------------------------------------------------------------
+ # Step 2: Create and associate Dataset (coverage)
+
+
if not dataset_ids:
# No datasets are currently linked which means we need to create a new one
dataset_id = self.clients.dataset_management.create_dataset( name= 'dataset_%s' % stream_id,
@@ -269,6 +276,10 @@ def activate_data_product_persistence(self, data_product_id=''):
# link dataset with data product. This creates the association in the resource registry
self.RR2.assign_dataset_to_data_product_with_has_dataset(dataset_id, data_product_id)
+
+ # Late binding of dataset with existing child data products
+ for child_dp_id in child_data_product_ids:
+ self.assign_dataset_to_data_product(child_dp_id, dataset_id)
# register the dataset for externalization
@@ -282,27 +293,22 @@ def activate_data_product_persistence(self, data_product_id=''):
dataset_id = dataset_ids[0]
-
#-----------------------------------------------------------------------------------------
+ # Step 3: Configure and start ingestion with lookup values
+
# grab the ingestion configuration id from the data_product in order to use to persist it
- #-----------------------------------------------------------------------------------------
if data_product_obj.dataset_configuration_id:
ingestion_configuration_id = data_product_obj.dataset_configuration_id
else:
ingestion_configuration_id = self.clients.ingestion_management.list_ingestion_configurations(id_only=True)[0]
-
- #--------------------------------------------------------------------------------
# Identify lookup tables
- #--------------------------------------------------------------------------------
config = DotDict()
if self._has_lookup_values(data_product_id):
config.process.input_product = data_product_id
config.process.lookup_docs = self._get_lookup_documents(data_product_id)
- #--------------------------------------------------------------------------------
# persist the data stream using the ingestion config id and stream id
- #--------------------------------------------------------------------------------
# find datasets for the data product
dataset_id = self.clients.ingestion_management.persist_data_stream(stream_id=stream_id,
@@ -320,7 +326,6 @@ def activate_data_product_persistence(self, data_product_id=''):
self.update_data_product(data_product_obj)
-
def is_persisted(self, data_product_id=''):
# Is the data product currently persisted into a data set?
try:
@@ -349,7 +354,7 @@ def suspend_data_product_persistence(self, data_product_id=''):
validate_is_not_none(data_product_obj, 'Should not have been empty')
validate_is_instance(data_product_obj, DataProduct)
- parent_dp_ids, _ = self.clients.resource_registry.find_objects(data_product_id,PRED.hasDataProductParent, id_only=True)
+ parent_dp_ids, _ = self.clients.resource_registry.find_objects(data_product_id, PRED.hasDataProductParent, id_only=True)
if not data_product_obj.dataset_configuration_id:
if parent_dp_ids:
@@ -368,7 +373,8 @@ def suspend_data_product_persistence(self, data_product_id=''):
log.debug("stream found")
validate_is_not_none(stream_id, 'Data Product %s must have one stream associated' % str(data_product_id))
- self.clients.ingestion_management.unpersist_data_stream(stream_id=stream_id, ingestion_configuration_id=data_product_obj.dataset_configuration_id)
+ self.clients.ingestion_management.unpersist_data_stream(stream_id=stream_id,
+ ingestion_configuration_id=data_product_obj.dataset_configuration_id)
except NotFound:
if data_product_obj.lcstate == LCS.RETIRED:

0 comments on commit e025524

Please sign in to comment.
Something went wrong with that request. Please try again.