From e5d6c9a6ff8634e838aff0ab9b157e1d4200d149 Mon Sep 17 00:00:00 2001 From: Michael Meisinger Date: Thu, 11 Apr 2013 10:20:14 -0700 Subject: [PATCH 01/15] Enhanced instrument model OOI preload - more attributes --- ion/processes/bootstrap/ion_loader.py | 33 ++++++++++++++++++++------- ion/processes/bootstrap/ooi_loader.py | 1 + 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/ion/processes/bootstrap/ion_loader.py b/ion/processes/bootstrap/ion_loader.py index f2e0c5e04..41bf7577f 100755 --- a/ion/processes/bootstrap/ion_loader.py +++ b/ion/processes/bootstrap/ion_loader.py @@ -1055,6 +1055,7 @@ def _load_InstrumentModel(self, row): def _load_InstrumentModel_OOI(self): class_objs = self.ooi_loader.get_type_assets("class") series_objs = self.ooi_loader.get_type_assets("series") + subseries_objs = self.ooi_loader.get_type_assets("subseries") family_objs = self.ooi_loader.get_type_assets("family") makemodel_objs = self.ooi_loader.get_type_assets("makemodel") @@ -1064,31 +1065,41 @@ def _load_InstrumentModel_OOI(self): if "DEPRECATED" in class_name: continue family_obj = family_objs[class_obj['family']] + makemodel_obj = makemodel_objs[series_obj['makemodel']] if series_obj.get('makemodel', None) else None + subseries_obj = subseries_objs.get(ooi_id + "01", None) newrow = {} newrow[COL_ID] = ooi_id newrow['im/name'] = "%s (%s-%s)" % (class_name, series_obj['Class'], series_obj['Series']) newrow['im/alt_ids'] = "['OOI:" + ooi_id + "']" newrow['im/description'] = series_obj['description'] - newrow['im/instrument_family'] = family_obj['name'] + newrow['im/instrument_family'] = family_obj['name'] # DEPRECATED. Remove when UI db updated. + newrow['im/family_id'] = family_obj['id'] + newrow['im/family_name'] = family_obj['name'] + newrow['im/class_id'] = class_obj['id'] + newrow['im/class_name'] = class_obj['name'] + newrow['im/class_alternate_name'] = class_obj['Alternate Instrument Class Name'] + newrow['im/class_description'] = class_obj['description'] + newrow['im/series_id'] = series_obj['id'] + newrow['im/series_name'] = series_obj['name'] + newrow['im/subseries_id'] = subseries_obj['id'] if subseries_obj else "" + newrow['im/subseries_name'] = subseries_obj['name'] if subseries_obj else "" + newrow['im/configuration'] = subseries_obj['Instrument Configuration'] if subseries_obj else "" + newrow['im/ooi_make_model'] = makemodel_obj['name'] if makemodel_obj else "" + newrow['im/manufacturer'] = makemodel_obj['Manufacturer'] if makemodel_obj else "" + newrow['im/manufacturer_url'] = makemodel_obj['Vendor Website'] if makemodel_obj else "" newrow['im/reference_designator'] = ooi_id newrow['org_ids'] = self.ooi_loader.get_org_ids(class_obj.get('array_list', None)) reference_urls = [] addl = {} - if series_obj.get('makemodel', None): - makemodel_obj = makemodel_objs[series_obj['makemodel']] - newrow['im/manufacturer'] = makemodel_obj['Manufacturer'] - newrow['im/manufacturer_url'] = makemodel_obj['Vendor Website'] + if makemodel_obj: addl.update(dict(connector=makemodel_obj['Connector'], - makemodel=series_obj['makemodel'], makemodel_description=makemodel_obj['Make_Model_Description'], input_voltage_range=makemodel_obj['Input Voltage Range'], interface=makemodel_obj['Interface'], - makemodel_url=makemodel_obj['Make/Model Website'], output_description=makemodel_obj['Output Description'], )) if makemodel_obj['Make/Model Website']: reference_urls.append(makemodel_obj['Make/Model Website']) newrow['im/reference_urls'] = ",".join(reference_urls) - addl['alternate_name'] = series_obj['Alternate Instrument Class Name'] addl['class_long_name'] = series_obj['ClassLongName'] addl['comments'] = series_obj['Comments'] newrow['im/addl'] = repr(addl) @@ -1165,6 +1176,7 @@ def _load_Subsite(self, row): headers=headers) def _load_Subsite_OOI(self): + # Not needed for current OOI import. Only one level of geospatial site is used. pass def _load_PlatformSite(self, row): @@ -2208,6 +2220,11 @@ def _load_DataProduct_OOI(self): ooi_objs = self.ooi_loader.get_type_assets("instrument") data_products = self.ooi_loader.get_type_assets("data_product") + + # For each device agent + # for each stream + # create Dataset + for ooi_id, ooi_obj in ooi_objs.iteritems(): const_id1 = '' if ooi_obj['latitude'] or ooi_obj['longitude'] or ooi_obj['depth_port_max'] or ooi_obj['depth_port_min']: diff --git a/ion/processes/bootstrap/ooi_loader.py b/ion/processes/bootstrap/ooi_loader.py index da322b9cc..f510ed46f 100644 --- a/ion/processes/bootstrap/ooi_loader.py +++ b/ion/processes/bootstrap/ooi_loader.py @@ -217,6 +217,7 @@ def _parse_AttributeReportMakeModel(self, row): self._add_object_attribute('makemodel', row['Make_Model'], row['Attribute'], row['Attribute_Value'], mapping={}, + name=row['Make_Model'], Manufacturer=row['Manufacturer'], Make_Model_Description=row['Make_Model_Description']) def _parse_AttributeReportNodes(self, row): From 7fec4718b7501a4dc98acf8a8963fcaf5fc99a2b Mon Sep 17 00:00:00 2001 From: Michael Meisinger Date: Mon, 15 Apr 2013 21:29:50 -0700 Subject: [PATCH 02/15] Enhanced OOI loader --- extern/ion-definitions | 2 +- ion/core/ooiref.py | 2 +- ion/processes/bootstrap/ion_loader.py | 4 +- ion/processes/bootstrap/ooi_loader.py | 88 +++++++++++++++++++++------ 4 files changed, 76 insertions(+), 20 deletions(-) diff --git a/extern/ion-definitions b/extern/ion-definitions index 1852110c7..b7cb667a2 160000 --- a/extern/ion-definitions +++ b/extern/ion-definitions @@ -1 +1 @@ -Subproject commit 1852110c7b717a2a95ba939d6400876b80f740e4 +Subproject commit b7cb667a2267d296f90ded17f6e3644f9810bab5 diff --git a/ion/core/ooiref.py b/ion/core/ooiref.py index 86976553c..2e81ecd0e 100644 --- a/ion/core/ooiref.py +++ b/ion/core/ooiref.py @@ -40,7 +40,7 @@ def __init__(self, rdstr): self.dataproduct = rdstr else: # - - - - m = re.match('^([A-Z]{2})(?:(\d{2})(?:(\w{4})(?:-([A-Z]{2})(?:([A-Z0-9]{3})(?:-([A-Z0-9]{2})(?:-([A-Z0-9]{5})([A-Z0-9])(\d{3})?)?)?)?)?)?)?$', rdstr) + m = re.match(r'^([A-Z]{2})(?:(\d{2})(?:(\w{4})(?:-([A-Z]{2})(?:([A-Z0-9]{3})(?:-([A-Z0-9]{2})(?:-([A-Z0-9]{5})([A-Z0-9])(\d{3})?)?)?)?)?)?)?$', rdstr) if m: self.rd_type = "asset" self.array, self.site, self.subsite, self.node_type, self.node_seq, self.port, self.inst_class, self.inst_series, self.inst_seq = m.groups() diff --git a/ion/processes/bootstrap/ion_loader.py b/ion/processes/bootstrap/ion_loader.py index 41bf7577f..298b8160d 100755 --- a/ion/processes/bootstrap/ion_loader.py +++ b/ion/processes/bootstrap/ion_loader.py @@ -235,6 +235,7 @@ def on_start(self): if self.loadooi: self.ooi_loader.extract_ooi_assets() + self.ooi_loader.analyze_ooi_assets(self.ooiuntil) if self.loadui: specs_path = 'interface/ui_specs.json' if self.exportui else None self.ui_loader.load_ui(self.ui_path, specs_path=specs_path) @@ -247,7 +248,8 @@ def on_start(self): elif op == "parseooi": self.ooi_loader.extract_ooi_assets() - self.ooi_loader._analyze_ooi_assets(self.ooiuntil) + self.ooi_loader.analyze_ooi_assets(self.ooiuntil) + self.ooi_loader.report_ooi_assets() elif op == "loadui": specs_path = 'interface/ui_specs.json' if self.exportui else None self.ui_loader.load_ui(self.ui_path, specs_path=specs_path) diff --git a/ion/processes/bootstrap/ooi_loader.py b/ion/processes/bootstrap/ooi_loader.py index f510ed46f..ce9e674be 100644 --- a/ion/processes/bootstrap/ooi_loader.py +++ b/ion/processes/bootstrap/ooi_loader.py @@ -63,6 +63,7 @@ def extract_ooi_assets(self): 'PlatformAgents', 'Series', 'InstAgents', + 'InstAvail', ] # Holds the object representations of parsed OOI assets by type @@ -512,6 +513,20 @@ def _parse_InstAgents(self, row): self._add_object_attribute('instagent', agent_code, None, None, active=row['Active'] == "Yes") + def _parse_InstAvail(self, row): + node_code = row['Node Code'] + inst_class = row['Instrument Class'] + inst_series = row['Instrument Series'] + inst_deploy = row['First Deployment Date'] + + code = node_code + "-" + inst_class + inst_series + deploy_value = (inst_class + inst_series, inst_deploy) + + if deploy_value and re.match(r'\d+-\d+-\d+', inst_deploy): + self._add_object_attribute('node', + node_code, 'model_deploy_list', deploy_value, value_is_list=True, list_dup_ok=True) + + # ---- Post-processing and validation ---- def _perform_ooi_checks(self): @@ -613,7 +628,6 @@ def _post_process(self): pass - def get_marine_io(self, ooi_rd_str): ooi_rd = OOIReferenceDesignator(ooi_rd_str) if ooi_rd.error: @@ -686,7 +700,7 @@ def delete_ooi_assets(self): log.info("Deleted %s OOI resources and %s associations", len(del_objs), len(del_assocs)) - def _analyze_ooi_assets(self, end_date): + def analyze_ooi_assets(self, end_date): report_lines = [] node_objs = self.get_type_assets("node") inst_objs = self.get_type_assets("instrument") @@ -743,21 +757,35 @@ def _analyze_ooi_assets(self, end_date): if node_id not in isite_by_node: isite_by_node[node_id] = [] isite_by_node[node_id].append(inst_id) - inst_deploy = inst_obj.get("First Deployment Date", None) + + # Find possible override instrument deploy date from InstAvail tab + node_obj = node_objs[node_id] + node_model_dates = node_obj.get("model_deploy_list", None) + inst_deploy = None + if node_model_dates: + for iseries, idate in node_model_dates: + if iseries ==ooi_rd.series_rd: + inst_deploy = idate + break + #if not inst_deploy: + # inst_deploy = inst_obj.get("First Deployment Date", None) node_deploy = node_objs[ooi_rd.node_rd].get("deploy_date", None) deploy_date = None if inst_deploy: try: - deploy_date = datetime.datetime.strptime(inst_deploy, "%Y-%m") + deploy_date = datetime.datetime.strptime(inst_deploy, "%Y-%m-%d") except Exception as ex: try: - deploy_date = datetime.datetime.strptime(inst_deploy, "%Y") + deploy_date = datetime.datetime.strptime(inst_deploy, "%Y-%m") except Exception as ex: - pass + try: + deploy_date = datetime.datetime.strptime(inst_deploy, "%Y") + except Exception as ex: + pass if deploy_date and node_deploy: deploy_date = max(deploy_date, node_deploy) - elif node_deploy: - deploy_date = node_deploy + #elif node_deploy: + # deploy_date = node_deploy inst_obj['deploy_date'] = deploy_date or datetime.datetime(2020, 1, 1) # Set recovery mode etc in nodes and instruments @@ -778,11 +806,25 @@ def _analyze_ooi_assets(self, end_date): inst_obj['data_agent_rt'] = data_agent_rt inst_obj['data_agent_recovery'] = data_agent_recovery - print "OOI ASSET REPORT - DEPLOYMENT UNTIL", end_date.strftime('%Y-%m-%d') if end_date else "PROGRAM END" - print "Platforms by deployment date:" + report_lines.append((0, "OOI ASSET REPORT - DEPLOYMENT UNTIL %s" % end_date.strftime('%Y-%m-%d') if end_date else "PROGRAM END")) + report_lines.append((0, "Platforms by deployment date:")) inst_class_all = set() for ooi_obj in deploy_platform_list: inst_class_top = set() + platform_deploy_date = ooi_obj['deploy_date'] + + def follow_node_inst(node_id, level): + inst_lines = [] + for inst_id in isite_by_node.get(node_id, []): + inst_obj = inst_objs[inst_id] + deploy_date = inst_obj.get('deploy_date', datetime.datetime(2020, 1, 1)) + time_delta = platform_deploy_date - deploy_date + if abs(time_delta.days) < 40: + inst_lines.append((2, "%s +-%s %s %s" % ( + " " * level, inst_id, "IA" if inst_obj['instrument_agent_rt'] else "", "DA" if inst_obj['data_agent_rt'] else ""))) + else: + inst_lines.append((2, "%s +-%s IGNORED" % (" " * level, inst_id))) + return sorted(inst_lines) def follow_child_nodes(level, child_nodes=None): if not child_nodes: @@ -791,25 +833,37 @@ def follow_child_nodes(level, child_nodes=None): ch_obj = node_objs[ch_id] inst_class_node = set((inst, ch_obj.get('platform_agent_type', "")) for inst in inst_by_node.get(ch_id, [])) inst_class_top.update(inst_class_node) - print " "*level, " +-"+ch_obj['id'], ch_obj['name'], ch_obj.get('platform_agent_type', ""), ":", ", ".join([i for i,p in sorted(list(inst_class_node))]) + report_lines.append((1, "%s +-%s %s %s: %s" % (" "*level, ch_obj['id'], ch_obj['name'], ch_obj.get('platform_agent_type', ""), ", ".join([i for i,p in sorted(list(inst_class_node))])))) + inst_lines = follow_node_inst(ch_id, level) + report_lines.extend(inst_lines) follow_child_nodes(level+1, platform_children.get(ch_id,None)) inst_class_node = set((inst, ooi_obj.get('platform_agent_type', "")) for inst in inst_by_node.get(ooi_obj['id'], [])) inst_class_top.update(inst_class_node) - print " ", ooi_obj['deployment_start'], ooi_obj['id'], ooi_obj['name'], ooi_obj.get('platform_agent_type', ""), ":", ", ".join([i for i,p in sorted(list(inst_class_node))]) + report_lines.append((0, " %s %s %s %s: %s" % (ooi_obj['deployment_start'], ooi_obj['id'], ooi_obj['name'], ooi_obj.get('platform_agent_type', ""), ", ".join([i for i,p in sorted(list(inst_class_node))])))) + + inst_lines = follow_node_inst(ooi_obj['id'], 0) + report_lines.extend(inst_lines) follow_child_nodes(0, platform_children.get(ooi_obj['id'], None)) inst_class_all.update(inst_class_top) - print "Instrument Models:" + report_lines.append((0, "Instrument Models:")) inst_by_conn = {} for clss, conn in inst_class_all: if conn not in inst_by_conn: inst_by_conn[conn] = [] inst_by_conn[conn].append(clss) for conn, inst in inst_by_conn.iteritems(): - print " ", conn, ":", ",".join(sorted(inst)) + report_lines.append((0, " %s: %s" % (conn, ",".join(sorted(inst))))) + + self.asset_report = report_lines + + def report_ooi_assets(self, report_level=5, dump_assets=True, print_report=True): + if print_report: + print "\n".join(line for level, line in self.asset_report if level < report_level) - from ion.util.datastore.resources import ResourceRegistryHelper - rrh = ResourceRegistryHelper() - rrh.dump_dicts_as_xlsx(self.ooi_objects) + if dump_assets: + from ion.util.datastore.resources import ResourceRegistryHelper + rrh = ResourceRegistryHelper() + rrh.dump_dicts_as_xlsx(self.ooi_objects) From bb62aa6a3ad03cca620c7bc97517da52edb4ba7f Mon Sep 17 00:00:00 2001 From: Michael Meisinger Date: Tue, 16 Apr 2013 11:29:01 -0700 Subject: [PATCH 03/15] Enhanced preload - initial steps --- extern/ion-definitions | 2 +- ion/processes/bootstrap/ion_loader.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/extern/ion-definitions b/extern/ion-definitions index d4d312b63..f6bb2a4bc 160000 --- a/extern/ion-definitions +++ b/extern/ion-definitions @@ -1 +1 @@ -Subproject commit d4d312b63aca64caa980623593e7c87f49b75268 +Subproject commit f6bb2a4bc18ce77959da56ddd4602849e4ec9753 diff --git a/ion/processes/bootstrap/ion_loader.py b/ion/processes/bootstrap/ion_loader.py index 65022c3ee..e819970ab 100755 --- a/ion/processes/bootstrap/ion_loader.py +++ b/ion/processes/bootstrap/ion_loader.py @@ -145,6 +145,14 @@ 'Parser', ] +DEFINITION_CATEGORIES = [ + 'Constraint', # in memory only + 'Contact', # in memory only + 'CoordinateSystem', # in memory only + "Alerts", # in memory only + 'StreamConfiguration', # in memory only +] + COL_SCENARIO = "Scenario" COL_ID = "ID" COL_OWNER = "owner_id" From a8d517e5992eaaa387548fb5a70e44b5f9e6abc3 Mon Sep 17 00:00:00 2001 From: Michael Meisinger Date: Tue, 16 Apr 2013 12:40:46 -0700 Subject: [PATCH 04/15] Preload treats definition categories now separately --- ion/processes/bootstrap/ion_loader.py | 55 ++++++++++++++++++--------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/ion/processes/bootstrap/ion_loader.py b/ion/processes/bootstrap/ion_loader.py index e819970ab..1c0e74abd 100755 --- a/ion/processes/bootstrap/ion_loader.py +++ b/ion/processes/bootstrap/ion_loader.py @@ -102,17 +102,17 @@ # The preload spreadsheets (tabs) in the order they should be loaded DEFAULT_CATEGORIES = [ - 'Constraint', # in memory only - 'Contact', # in memory only + 'Constraint', # in memory only - all scenarios loaded + 'Contact', # in memory only - all scenarios loaded 'User', 'Org', 'UserRole', # association only - 'CoordinateSystem', # in memory only + 'CoordinateSystem', # in memory only - all scenarios loaded 'ParameterFunctions', 'ParameterDefs', 'ParameterDictionary', - "Alerts", # in memory only - 'StreamConfiguration', # in memory only + 'Alerts', # in memory only - all scenarios loaded + 'StreamConfiguration', # in memory only - all scenarios loaded 'SensorModel', 'PlatformModel', 'InstrumentModel', @@ -145,14 +145,19 @@ 'Parser', ] +# The following lists all categories that define information used by other categories. +# A definition in these categories has no persistent side effect on the system. DEFINITION_CATEGORIES = [ - 'Constraint', # in memory only - 'Contact', # in memory only - 'CoordinateSystem', # in memory only - "Alerts", # in memory only - 'StreamConfiguration', # in memory only + 'Constraint', + 'Contact', + 'CoordinateSystem', + 'Alerts', + 'StreamConfiguration', ] +# The following lists the scenarios that are always ignored +IGNORE_SCENARIOS = ["", "DOC", "DOC:README", "STOP!", "X"] + COL_SCENARIO = "Scenario" COL_ID = "ID" COL_OWNER = "owner_id" @@ -167,7 +172,6 @@ class IONLoader(ImmediateProcess): - def __init__(self,*a, **b): super(IONLoader,self).__init__(*a,**b) @@ -341,7 +345,8 @@ def _select_rows(self, reader, category, scenarios): row_skip = row_do = 0 rows = [] for row in reader: - if any(sc in scenarios for sc in row[COL_SCENARIO].split(",")): + if (category in DEFINITION_CATEGORIES and any(sc not in IGNORE_SCENARIOS for sc in row[COL_SCENARIO].split(","))) \ + or any(sc in scenarios for sc in row[COL_SCENARIO].split(",")): row_do += 1 rows.append(row) else: @@ -813,8 +818,10 @@ def _create_association(self, subject=None, predicate=None, obj=None): # Add specific types of resources below def _load_Contact(self, row): - """ create constraint IonObject but do not insert into DB, - cache in dictionary for inclusion in other preload objects """ + """ + DEFINITION category. Load and keep IonObject for reference by other categories. No side effects. + Keeps contact information objects. + """ cont_id = row[COL_ID] log.trace('creating contact: ' + cont_id) if cont_id in self.contact_defs: @@ -845,8 +852,10 @@ def _load_Contact_OOI(self): self._load_Contact(controw) def _load_Constraint(self, row): - """ create constraint IonObject but do not insert into DB, - cache in dictionary for inclusion in other preload objects """ + """ + DEFINITION category. Load and keep IonObject for reference by other categories. No side effects. + Keeps geospatial/temporal constraints + """ const_id = row[COL_ID] if const_id in self.constraint_defs: raise iex.BadRequest('constraint with ID already exists: ' + const_id) @@ -859,6 +868,10 @@ def _load_Constraint(self, row): raise iex.BadRequest('constraint type must be either geospatial or temporal, not ' + const_type) def _load_CoordinateSystem(self, row): + """ + DEFINITION category. Load and keep IonObject for reference by other categories. No side effects. + Keeps coordinate system definition objects. + """ gcrs = self._create_object_from_row("GeospatialCoordinateReferenceSystem", row, "m/") cs_id = row[COL_ID] self.resource_ids[cs_id] = gcrs @@ -1845,6 +1858,10 @@ def _parse_alert_range(self, expression): return out def _load_Alerts(self, row): + """ + DEFINITION category. Load and keep object for reference by other categories. No side effects. + Keeps alert definition dicts. + """ # alert is just a dict alert = { 'name': row['name'], @@ -1859,7 +1876,11 @@ def _load_Alerts(self, row): self.alerts[row[COL_ID]] = alert def _load_StreamConfiguration(self, row): - """ parse and save for use in *AgentInstance objects """ + """ + DEFINITION category. Load and keep IonObject for reference by other categories. No side effects. + Keeps stream configuration object for use in *AgentInstance categories. + """ + # alerts = [] # if row['alerts']: # for id in row['alerts'].split(','): From 2f4561c625d4dbef4fa1836f70c479755d4f3bd7 Mon Sep 17 00:00:00 2001 From: Michael Meisinger Date: Tue, 16 Apr 2013 13:17:13 -0700 Subject: [PATCH 05/15] Added ability to script preload runs --- extern/ion-definitions | 2 +- ion/processes/bootstrap/ion_loader.py | 133 +++++++++++++++----------- 2 files changed, 78 insertions(+), 57 deletions(-) diff --git a/extern/ion-definitions b/extern/ion-definitions index f6bb2a4bc..d7e0ca5a5 160000 --- a/extern/ion-definitions +++ b/extern/ion-definitions @@ -1 +1 @@ -Subproject commit f6bb2a4bc18ce77959da56ddd4602849e4ec9753 +Subproject commit d7e0ca5a52d51e79e11572bd469b5ffb705ed9da diff --git a/ion/processes/bootstrap/ion_loader.py b/ion/processes/bootstrap/ion_loader.py index 1c0e74abd..6583d6915 100755 --- a/ion/processes/bootstrap/ion_loader.py +++ b/ion/processes/bootstrap/ion_loader.py @@ -1,44 +1,42 @@ #!/usr/bin/env python """Process that loads ION resources via service calls based on definitions in spreadsheets using loader functions. - @see https://confluence.oceanobservatories.org/display/CIDev/R2+System+Preload - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=master scenario=R2_DEMO - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc/R2PreloadedResources.xlsx scenario=R2_DEMO - - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path="https://docs.google.com/spreadsheet/pub?key=0AttCeOvLP6XMdG82NHZfSEJJOGdQTkgzb05aRjkzMEE&output=xls" scenario=R2_DEMO - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=R2_DEMO - - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=loadui path=res/preload/r2_ioc - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=loadui path=https://userexperience.oceanobservatories.org/database-exports/ - - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=master assets=res/preload/r2_ioc/ooi_assets scenario=R2_DEMO loadooi=True - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=R2_DEMO loadooi=True assets=res/preload/r2_ioc/ooi_assets - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=R2_DEMO loadui=True - - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=parseooi assets=res/preload/r2_ioc/ooi_assets - - bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=deleteooi - - ui_path= override location to get UI preload files (default is path + '/ui_assets') - assets= override location to get OOI asset file (default is path + '/ooi_assets') - attachments= override location to get file attachments (default is path) - ooifilter= one or comma separated list of CE,CP,GA,GI,GP,GS,ES to limit ooi resource import - ooiexclude= one or more categories to NOT import in the OOI import - ooiuntil= datetime of latest planned deployment date to consider for data product etc import mm/dd/yyyy - bulk= if True, uses RR bulk insert operations to load, not service calls - debug= if True, allows a few shortcuts to perform faster loads - exportui= if True, writes interface/ui_specs.json with UI object - - TODO: constraints defined in multiple tables as list of IDs, but not used - TODO: support attachments using HTTP URL - TODO: Owner, Events with bulk - - Note: For quick debugging without restarting the services container: - - Once after starting r2deploy: - bin/pycc -x ion.processes.bootstrap.datastore_loader.DatastoreLoader op=dump path=res/preload/local/my_dump - - Before each test of the ion_loader: - bin/pycc -fc -x ion.processes.bootstrap.datastore_loader.DatastoreLoader op=load path=res/preload/local/my_dump + + Examples: + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=master scenario=R2_DEMO + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc/R2PreloadedResources.xlsx scenario=R2_DEMO + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path="https://docs.google.com/spreadsheet/pub?key=0AttCeOvLP6XMdG82NHZfSEJJOGdQTkgzb05aRjkzMEE&output=xls" scenario=R2_DEMO + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=R2_DEMO + + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=loadui path=res/preload/r2_ioc + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=loadui path=https://userexperience.oceanobservatories.org/database-exports/ + + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=master assets=res/preload/r2_ioc/ooi_assets scenario=R2_DEMO loadooi=True + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=R2_DEMO loadooi=True assets=res/preload/r2_ioc/ooi_assets + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=load path=res/preload/r2_ioc scenario=R2_DEMO loadui=True + + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader cfg=res/preload/r2_ioc/config/ooi_load_config.yml + + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=parseooi assets=res/preload/r2_ioc/ooi_assets + bin/pycc -x ion.processes.bootstrap.ion_loader.IONLoader op=deleteooi + + Options: + ui_path= override location to get UI preload files (default is path + '/ui_assets') + assets= override location to get OOI asset file (default is path + '/ooi_assets') + attachments= override location to get file attachments (default is path) + ooifilter= one or comma separated list of CE,CP,GA,GI,GP,GS,ES to limit ooi resource import + ooiexclude= one or more categories to NOT import in the OOI import + ooiuntil= datetime of latest planned deployment date to consider for data product etc import mm/dd/yyyy + bulk= if True, uses RR bulk insert operations to load, not service calls + debug= if True, allows a few shortcuts to perform faster loads + exportui= if True, writes interface/ui_specs.json with UI object + cfg= Path to a preload config file that allows scripted preload runs with defined params + + TODO: + support attachments using HTTP URL + Owner, Events with bulk + Set lifecycle state through appropriate service operations """ __author__ = 'Michael Meisinger, Ian Katz, Thomas Lennan, Jonathan Newbrough' @@ -49,6 +47,7 @@ import datetime import ast import calendar +import copy import csv import re import requests @@ -61,7 +60,8 @@ from pyon.ion.identifier import create_unique_resource_id, create_unique_association_id from pyon.ion.resource import get_restype_lcsm from pyon.public import log, ImmediateProcess, iex, IonObject, RT, PRED, OT, LCS, AS -from pyon.util.containers import get_ion_ts, named_any +from pyon.util.containers import get_ion_ts, named_any, dict_merge +from pyon.util.config import Config from ion.agents.port.port_agent_process import PortAgentProcessType, PortAgentType from ion.core.ooiref import OOIReferenceDesignator @@ -106,7 +106,7 @@ 'Contact', # in memory only - all scenarios loaded 'User', 'Org', - 'UserRole', # association only + 'UserRole', # no resource - association only 'CoordinateSystem', # in memory only - all scenarios loaded 'ParameterFunctions', 'ParameterDefs', @@ -137,7 +137,7 @@ 'TransformFunction', 'DataProcessDefinition', 'DataProcess', - 'DataProductLink', # association only + 'DataProductLink', # no resource but complex service call 'Attachment', 'WorkflowDefinition', 'Workflow', @@ -193,38 +193,59 @@ def __init__(self,*a, **b): self.rpc_sender = self def on_start(self): + cfg = self.CFG.get("cfg", None) + if cfg: + log.warn("ION loader scripted mode using config file: %s", cfg) + self.preload_cfg = Config([cfg]).data + load_sequence = self.preload_cfg["load_sequence"] + for num, step_cfg in enumerate(load_sequence): + log.info("Executing preload step %s '%s'", num, step_cfg['name']) + docstr = step_cfg.get("docstring", None) + if docstr: + log.debug("Explanation: "+ docstr) + step_config_override = step_cfg.get("config", {}) + log.debug("Step config override: %s", step_config_override) + step_config = copy.deepcopy(self.CFG) + dict_merge(step_config, step_config_override, inplace=True) + self._do_preload(step_config) + log.info("-------------------------- Completed step '%s' --------------------------", step_cfg['name']) + else: + self.preload_cfg = None + self._do_preload(self.CFG) + + def _do_preload(self, config): # Main operation to perform - op = self.CFG.get("op", None) + op = config.get("op", None) # Additional parameters - self.path = self.CFG.get("path", TESTED_DOC) + self.path = config.get("path", TESTED_DOC) if self.path=='master': self.path = MASTER_DOC - self.attachment_path = self.CFG.get("attachments", self.path + '/attachments') - self.asset_path = self.CFG.get("assets", self.path + "/ooi_assets") + self.attachment_path = config.get("attachments", self.path + '/attachments') + self.asset_path = config.get("assets", self.path + "/ooi_assets") default_ui_path = self.path if self.path.startswith('http') else self.path + "/ui_assets" - self.ui_path = self.CFG.get("ui_path", default_ui_path) + self.ui_path = config.get("ui_path", default_ui_path) if self.ui_path=='default': self.ui_path = TESTED_UI_ASSETS elif self.ui_path=='candidate': self.ui_path = CANDIDATE_UI_ASSETS - scenarios = self.CFG.get("scenario", None) - category_csv = self.CFG.get("categories", None) + scenarios = config.get("scenario", None) + category_csv = config.get("categories", None) self.categories = category_csv.split(",") if category_csv else DEFAULT_CATEGORIES - self.debug = self.CFG.get("debug", False) # Debug mode with certain shorthands - self.loadooi = self.CFG.get("loadooi", False) # Import OOI asset data - self.loadui = self.CFG.get("loadui", False) # Import UI asset data - self.exportui = self.CFG.get("exportui", False) # Save UI JSON file - self.update = self.CFG.get("update", False) # Support update to existing resources - self.bulk = self.CFG.get("bulk", False) # Use bulk insert where available - self.ooifilter = self.CFG.get("ooifilter", None) # Filter OOI import to RD prefixes (e.g. array "CE,GP") - self.ooiexclude = self.CFG.get("ooiexclude", '') # Don't import the listed categories + self.debug = config.get("debug", False) # Debug mode with certain shorthands + self.loadooi = config.get("loadooi", False) # Import OOI asset data + self.loadui = config.get("loadui", False) # Import UI asset data + self.exportui = config.get("exportui", False) # Save UI JSON file + self.update = config.get("update", False) # Support update to existing resources + self.bulk = config.get("bulk", False) # Use bulk insert where available + self.ooifilter = config.get("ooifilter", None) # Filter OOI import to RD prefixes (e.g. array "CE,GP") + self.ooiexclude = config.get("ooiexclude", '') # Don't import the listed categories if self.ooiexclude: self.ooiexclude = self.ooiexclude.split(',') - self.ooiuntil = self.CFG.get("ooiuntil", None) # Don't import stuff later than given date + self.ooiuntil = config.get("ooiuntil", None) # Don't import stuff later than given date if self.ooiuntil: self.ooiuntil = datetime.datetime.strptime(self.ooiuntil, "%m/%d/%Y") From c941defef0a222733c36ab5d769d32e88f7f4ba9 Mon Sep 17 00:00:00 2001 From: Edward Hunter Date: Tue, 16 Apr 2013 16:03:11 -0700 Subject: [PATCH 06/15] Fix alert eval bug. --- ion/agents/alerts/alerts.py | 4 ++-- ion/agents/instrument/instrument_agent.py | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/ion/agents/alerts/alerts.py b/ion/agents/alerts/alerts.py index c6e592d55..7e0fcc3ae 100644 --- a/ion/agents/alerts/alerts.py +++ b/ion/agents/alerts/alerts.py @@ -159,7 +159,7 @@ def get_status(self): status['upper_rel_op'] = self._upper_rel_op return status - def eval_alert(self, stream_name=None, value=None, value_id=None): + def eval_alert(self, stream_name=None, value=None, value_id=None, **kwargs): if stream_name != self._stream_name or value_id != self._value_id \ or not value: @@ -309,7 +309,7 @@ def get_status(self): status['time_delta'] = self._time_delta return status - def eval_alert(self, stream_name=None): + def eval_alert(self, stream_name=None, **kwargs): if stream_name != self._stream_name: return diff --git a/ion/agents/instrument/instrument_agent.py b/ion/agents/instrument/instrument_agent.py index 2908c6a2a..79ee828b0 100644 --- a/ion/agents/instrument/instrument_agent.py +++ b/ion/agents/instrument/instrument_agent.py @@ -814,6 +814,20 @@ def _async_driver_event_sample(self, val, ts): values : [{u'value_id': u'temp', u'value': 19.0612}, {u'value_id': u'conductivity', u'value': 3.33791}, {u'value_id': u'pressure', u'value': 449.005}] + + u'quality_flag': u'ok', + u'preferred_timestamp': u'port_timestamp', + u'stream_name': u'raw', + u'port_timestamp': 3575139438.357514, + u'pkt_format_id': u'JSON_Data', + u'pkt_version': 1, + u'values': [ + {u'binary': True, u'value_id': u'raw', u'value': u'aABlAGEAcgB0AGIAZQBhAHQAXwBpAG4AdABlAHIAdgBhAGwAIAAwAA=='}, + {u'value_id': u'length', u'value': 40}, + {u'value_id': u'type', u'value': 1}, + {u'value_id': u'checksum', u'value': None} + ], + u'driver_timestamp': 3575139438.206242 """ # If the sample event is encoded, load it back to a dict. From d290e65178a4caece9dfb34f175f166e5fa807c3 Mon Sep 17 00:00:00 2001 From: Michael Meisinger Date: Tue, 16 Apr 2013 18:39:28 -0700 Subject: [PATCH 07/15] OOI preload data product improvements --- extern/ion-definitions | 2 +- ion/processes/bootstrap/datastore_loader.py | 2 +- ion/processes/bootstrap/ion_loader.py | 122 +++++++++++++++----- ion/processes/bootstrap/ooi_loader.py | 9 +- ion/util/geo_utils.py | 12 +- ion/util/test/test_geo_utils.py | 4 +- 6 files changed, 111 insertions(+), 40 deletions(-) diff --git a/extern/ion-definitions b/extern/ion-definitions index d7e0ca5a5..daf7b0dfc 160000 --- a/extern/ion-definitions +++ b/extern/ion-definitions @@ -1 +1 @@ -Subproject commit d7e0ca5a52d51e79e11572bd469b5ffb705ed9da +Subproject commit daf7b0dfc8963f94fb49a8dc9f618b4a67458a98 diff --git a/ion/processes/bootstrap/datastore_loader.py b/ion/processes/bootstrap/datastore_loader.py index 157ae856e..6e8073117 100644 --- a/ion/processes/bootstrap/datastore_loader.py +++ b/ion/processes/bootstrap/datastore_loader.py @@ -24,7 +24,7 @@ class DatastoreAdmin(ImmediateProcess): bin/pycc -x ion.processes.bootstrap.datastore_loader.DatastoreLoader op=clear prefix=ion bin/pycc -x ion.processes.bootstrap.datastore_loader.DatastoreLoader op=dump path=res/preload/local/my_dump bin/pycc -fc -x ion.processes.bootstrap.datastore_loader.DatastoreLoader op=load path=res/preload/local/my_dump - bin/pycc -fc -x ion.processes.bootstrap.datastore_loader.DatastoreLoader op=dumpres + bin/pycc -x ion.processes.bootstrap.datastore_loader.DatastoreLoader op=dumpres """ def on_init(self): pass diff --git a/ion/processes/bootstrap/ion_loader.py b/ion/processes/bootstrap/ion_loader.py index 6583d6915..27181cd58 100755 --- a/ion/processes/bootstrap/ion_loader.py +++ b/ion/processes/bootstrap/ion_loader.py @@ -68,7 +68,8 @@ from ion.processes.bootstrap.ooi_loader import OOILoader from ion.processes.bootstrap.ui_loader import UILoader from ion.services.dm.utility.granule_utils import time_series_domain -from ion.services.dm.utility.types import TypesManager +from ion.services.dm.utility.types import TypesManager +from ion.util.geo_utils import GeoUtils from ion.util.parse_utils import parse_dict, parse_phones, get_typed_value from ion.util.xlsparser import XLSParser @@ -172,8 +173,8 @@ class IONLoader(ImmediateProcess): - def __init__(self,*a, **b): - super(IONLoader,self).__init__(*a,**b) + def __init__(self, *a, **b): + super(IONLoader, self).__init__(*a,**b) # initialize these here instead of on_start # to support using IONLoader as a utility -- not just as a process @@ -214,7 +215,11 @@ def on_start(self): self._do_preload(self.CFG) def _do_preload(self, config): - # Main operation to perform + """ + One "run" of preload with one set of config arguments. + """ + + # Main operation to perform this run. op = config.get("op", None) # Additional parameters @@ -231,20 +236,7 @@ def _do_preload(self, config): elif self.ui_path=='candidate': self.ui_path = CANDIDATE_UI_ASSETS - scenarios = config.get("scenario", None) - category_csv = config.get("categories", None) - self.categories = category_csv.split(",") if category_csv else DEFAULT_CATEGORIES - self.debug = config.get("debug", False) # Debug mode with certain shorthands - self.loadooi = config.get("loadooi", False) # Import OOI asset data - self.loadui = config.get("loadui", False) # Import UI asset data - self.exportui = config.get("exportui", False) # Save UI JSON file - self.update = config.get("update", False) # Support update to existing resources - self.bulk = config.get("bulk", False) # Use bulk insert where available - self.ooifilter = config.get("ooifilter", None) # Filter OOI import to RD prefixes (e.g. array "CE,GP") - self.ooiexclude = config.get("ooiexclude", '') # Don't import the listed categories - if self.ooiexclude: - self.ooiexclude = self.ooiexclude.split(',') self.ooiuntil = config.get("ooiuntil", None) # Don't import stuff later than given date if self.ooiuntil: self.ooiuntil = datetime.datetime.strptime(self.ooiuntil, "%m/%d/%Y") @@ -254,17 +246,31 @@ def _do_preload(self, config): self.ooi_loader = OOILoader(self, asset_path=self.asset_path) self.resource_ds = DatastoreManager.get_datastore_instance(DataStore.DS_RESOURCES, DataStore.DS_PROFILE.RESOURCES) - # Loads internal bootstrapped resource ids that will be referenced during preload - self._load_system_ids() - - log.info("IONLoader: {op=%s, path=%s, scenario=%s}" % (op, self.path, scenarios)) + log.info("IONLoader: {op=%s, path=%s}", op, self.path) if not op: raise iex.BadRequest("No operation specified") # Perform operations if op == "load": + scenarios = config.get("scenario", None) if not scenarios: raise iex.BadRequest("Must provide scenarios to load: scenario=sc1,sc2,...") + log.debug("Scenarios: %s", scenarios) + + category_csv = config.get("categories", None) + self.categories = category_csv.split(",") if category_csv else DEFAULT_CATEGORIES + + self.loadooi = config.get("loadooi", False) # Import OOI asset data + self.loadui = config.get("loadui", False) # Import UI asset data + self.exportui = config.get("exportui", False) # Save UI JSON file + self.update = config.get("update", False) # Support update to existing resources + self.bulk = config.get("bulk", False) # Use bulk insert where available + self.ooifilter = config.get("ooifilter", None) # Filter OOI import to RD prefixes (e.g. array "CE,GP") + self.ooiexclude = config.get("ooiexclude", '') # Don't import the listed categories + if self.ooiexclude: + self.ooiexclude = self.ooiexclude.split(',') + + if self.loadooi: self.ooi_loader.extract_ooi_assets() @@ -273,6 +279,9 @@ def _do_preload(self, config): specs_path = 'interface/ui_specs.json' if self.exportui else None self.ui_loader.load_ui(self.ui_path, specs_path=specs_path) + # Loads internal bootstrapped resource ids that will be referenced during preload + self._load_system_ids() + # Load existing resources by preload ID self._prepare_incremental() @@ -283,15 +292,22 @@ def _do_preload(self, config): self.ooi_loader.extract_ooi_assets() self.ooi_loader.analyze_ooi_assets(self.ooiuntil) self.ooi_loader.report_ooi_assets() + + elif op == "deleteooi": + if self.debug: + self.ooi_loader.delete_ooi_assets() + else: + raise iex.BadRequest("deleteooi not allowed if debug==False") + elif op == "loadui": specs_path = 'interface/ui_specs.json' if self.exportui else None self.ui_loader.load_ui(self.ui_path, specs_path=specs_path) - elif op == "deleteooi": - self.ooi_loader.delete_ooi_assets() + elif op == "deleteui": self.ui_loader.delete_ui() + else: - raise iex.BadRequest("Operation unknown") + raise iex.BadRequest("Operation unknown: %s" % op) def on_quit(self): pass @@ -1153,6 +1169,14 @@ def _load_InstrumentModel_OOI(self): self._load_InstrumentModel(newrow) + def _calc_geospatial_point_center(self, site): + siteTypes = [RT.Site, RT.Subsite, RT.Observatory, RT.PlatformSite, RT.InstrumentSite, RT.Deployment] + if site and site.type_ in siteTypes: + # if the geospatial_bounds is set then calculate the geospatial_point_center + for constraint in site.constraint_list: + if constraint.type_ == OT.GeospatialBounds: + site.geospatial_point_center = GeoUtils.calc_geospatial_point_center(constraint) + def _load_Observatory(self, row): constraints = self._get_constraints(row, type='Observatory') coordinate_name = row['coordinate_system'] @@ -1163,6 +1187,10 @@ def _load_Observatory(self, row): set_attributes=dict(coordinate_reference_system=self.resource_ids[coordinate_name]) if coordinate_name else None, support_bulk=True) + if self.bulk: + fofr_obj = self._get_resource_obj(res_id) + self._calc_geospatial_point_center(fofr_obj) + def _load_Observatory_OOI(self): ooi_objs = self.ooi_loader.get_type_assets("ssite") for ooi_id, ooi_obj in ooi_objs.iteritems(): @@ -1207,6 +1235,10 @@ def _load_Subsite(self, row): set_attributes=dict(coordinate_reference_system=self.resource_ids[coordinate_name]) if coordinate_name else None, support_bulk=True) + if self.bulk: + fofr_obj = self._get_resource_obj(res_id) + self._calc_geospatial_point_center(fofr_obj) + headers = self._get_op_headers(row) psite_id = row.get("parent_site_id", None) if psite_id: @@ -1233,6 +1265,10 @@ def _load_PlatformSite(self, row): set_attributes=dict(coordinate_reference_system=self.resource_ids[coordinate_name]) if coordinate_name else None, support_bulk=True) + if self.bulk: + fofr_obj = self._get_resource_obj(res_id) + self._calc_geospatial_point_center(fofr_obj) + svc_client = self._get_service_client("observatory_management") headers = self._get_op_headers(row) @@ -1267,7 +1303,7 @@ def _load_PlatformSite_OOI(self): def _load_platform(ooi_id, ooi_obj): ooi_rd = OOIReferenceDesignator(ooi_id) const_id1 = '' - if ooi_obj.get('latitude',None) or ooi_obj.get('longitude',None) or ooi_obj.get('depth_subsite',None): + if ooi_obj.get('latitude', None) or ooi_obj.get('longitude', None) or ooi_obj.get('depth_subsite', None): const_id1 = ooi_id + "_const1" constrow = {} constrow[COL_ID] = const_id1 @@ -1347,6 +1383,10 @@ def _load_InstrumentSite(self, row): set_attributes=dict(coordinate_reference_system=self.resource_ids[coordinate_name]) if coordinate_name else None, support_bulk=True) + if self.bulk: + fofr_obj = self._get_resource_obj(res_id) + self._calc_geospatial_point_center(fofr_obj) + svc_client = self._get_service_client("observatory_management") headers = self._get_op_headers(row) @@ -2266,6 +2306,10 @@ def _load_DataProduct(self, row, do_bulk=False): # This is a non-functional, diminished version of a DataProduct, just to show up in lists res_id = self._create_bulk_resource(res_obj, row[COL_ID]) self._resource_assign_owner(headers, res_obj) + + if res_obj.geospatial_bounds: + res_obj.geospatial_point_center = GeoUtils.calc_geospatial_point_center(res_obj.geospatial_bounds) + # Create and associate Stream # Create and associate Dataset else: @@ -2304,6 +2348,7 @@ def _load_DataProduct_OOI(self): newrow[COL_ID] = ooi_id + "_DPIDP" newrow['dp/name'] = "Data Product parsed for device " + ooi_id newrow['dp/description'] = "Data Product (device, parsed) for: " + ooi_id + newrow['dp/ooi_product_name'] = "Raw" newrow['org_ids'] = self.ooi_loader.get_org_ids([ooi_id[:2]]) newrow['contact_ids'] = '' newrow['geo_constraint_id'] = const_id1 @@ -2317,6 +2362,7 @@ def _load_DataProduct_OOI(self): newrow[COL_ID] = ooi_id + "_DPIDR" newrow['dp/name'] = "Data Product raw for device " + ooi_id newrow['dp/description'] = "Data Product (device, raw) for: " + ooi_id + newrow['dp/ooi_product_name'] = "Parsed" newrow['org_ids'] = self.ooi_loader.get_org_ids([ooi_id[:2]]) newrow['contact_ids'] = '' newrow['geo_constraint_id'] = const_id1 @@ -2325,18 +2371,35 @@ def _load_DataProduct_OOI(self): newrow['stream_def_id'] = '' self._load_DataProduct(newrow, do_bulk=self.bulk) - # (3) Site Data Product - parsed data_product_list = ooi_obj.get('data_product_list', []) for dp_id in data_product_list: dp_obj = data_products[dp_id] - # (4*) Site Data Product DPS - Level + # (3*) Site Data Product DPS - Level newrow = {} newrow[COL_ID] = ooi_id + "_" + dp_id + "_DPID" newrow['dp/name'] = "%s %s at %s" % (dp_obj['name'], dp_obj['level'], ooi_id) - - #"Data Product for site " + ooi_id + " DPS " + dp_id newrow['dp/description'] = "Data Product DPS %s level %s for site %s: " % (dp_id, dp_obj['level'], ooi_id) + newrow['dp/ooi_short_name'] = dp_obj['code'] + newrow['dp/ooi_product_name'] = dp_obj['name'] + newrow['dp/processing_level_code'] = dp_obj['level'] + newrow['dp/regime'] = dp_obj.get('regime', "") + newrow['dp/qc_cmbnflg'] = dp_obj.get('Combine QC Flags (CMBNFLG) QC', "") + newrow['dp/qc_condcmp'] = dp_obj.get('Conductivity Compressibility Compensation (CONDCMP) QC', "") + newrow['dp/qc_glblrng'] = dp_obj.get('Global Range Test (GLBLRNG) QC', "") + newrow['dp/qc_gradtst'] = dp_obj.get('Gradient Test (GRADTST) QC', "") + newrow['dp/qc_interp1'] = dp_obj.get('1-D Interpolation (INTERP1) QC', "") + newrow['dp/qc_loclrng'] = dp_obj.get('Local Range Test (LOCLRNG) QC', "") + newrow['dp/qc_modulus'] = dp_obj.get('Modulus (MODULUS) QC', "") + newrow['dp/qc_polyval'] = dp_obj.get('Evaluate Polynomial (POLYVAL) QC', "") + newrow['dp/qc_solarel'] = dp_obj.get('Solar Elevation (SOLAREL) QC', "") + newrow['dp/qc_spketest'] = dp_obj.get('Spike Test (SPKETST) QC', "") + newrow['dp/qc_stuckvl'] = dp_obj.get('Stuck Value Test (STUCKVL) QC', "") + newrow['dp/qc_trndtst'] = dp_obj.get('Trend Test (TRNDTST) QC', "") + newrow['dp/dps_dcn'] = dp_obj.get('DPS DCN(s)', "") + newrow['dp/flow_diagram_dcn'] = dp_obj.get('Processing Flow Diagram DCN(s)', "") + newrow['dp/doors_l2_requirement_num'] = dp_obj.get('DOORS L2 Science Requirement #(s)', "") + newrow['dp/doors_l2_requirement_text'] = dp_obj.get('DOORS L2 Science Requirement Text', "") newrow['org_ids'] = self.ooi_loader.get_org_ids([ooi_id[:2]]) newrow['contact_ids'] = '' newrow['geo_constraint_id'] = const_id1 @@ -2496,7 +2559,6 @@ def _load_Deployment(self,row): set_attributes={"coordinate_reference_system": self.resource_ids[coordinate_name] if coordinate_name else None, "context": context}) - device_id = self.resource_ids[row['device_id']] site_id = self.resource_ids[row['site_id']] diff --git a/ion/processes/bootstrap/ooi_loader.py b/ion/processes/bootstrap/ooi_loader.py index ce9e674be..29e11a90c 100644 --- a/ion/processes/bootstrap/ooi_loader.py +++ b/ion/processes/bootstrap/ooi_loader.py @@ -20,12 +20,16 @@ def __init__(self, process, container=None, asset_path=None): self.process = process self.container = container or self.process.container self.asset_path = asset_path + self._extracted = False def extract_ooi_assets(self): """ Parses SAF Instrument Application export CSV files into intermediate memory structures. This information can later be loaded in to actual load_ion() function. """ + if self._extracted: + return + if not self.asset_path: raise iex.BadRequest("Must provide path for assets: path=dir or assets=dir") if self.asset_path.startswith('http'): @@ -125,6 +129,8 @@ def extract_ooi_assets(self): #print ot #print "\n".join(sorted(list(self.ooi_obj_attrs[ot]))) + self._extracted = True + def get_type_assets(self, objtype): return self.ooi_objects.get(objtype, None) @@ -205,7 +211,7 @@ def _parse_AttributeReportDataProducts(self, row): return self._add_object_attribute('data_product_type', row['Data_Product_Identifier'], row['Attribute'], row['AttributeValue'], - mapping={}, + mapping={'Regime(s)':'regime'}, Data_Product_Name=row['Data_Product_Name'], Data_Product_Level=row['Data_Product_Level']) def _parse_AttributeReportFamilies(self, row): @@ -370,6 +376,7 @@ def _parse_DataProductSpreadsheet(self, row): entry.pop("id", None) entry.update(dict( name=row['Data_Product_Name'].strip(), + code=dp_type, level=row['Data_Product_Level1'].strip(), units=row['Units'].strip(), dps=row['DPS_DCN_s_'].strip(), diff --git a/ion/util/geo_utils.py b/ion/util/geo_utils.py index 9531bf5f7..38173e2b5 100644 --- a/ion/util/geo_utils.py +++ b/ion/util/geo_utils.py @@ -11,12 +11,12 @@ class GeoUtils(object): @classmethod def calc_geospatial_point_center(cls, geospatial_bounds=None, distance=None): if not geospatial_bounds: - raise BadRequest ("Geospatial bounds data is not set correctly") + raise BadRequest("Geospatial bounds data is not set correctly") if (geospatial_bounds.geospatial_latitude_limit_north < -90 or geospatial_bounds.geospatial_latitude_limit_north > 90 or geospatial_bounds.geospatial_latitude_limit_south < -90 or geospatial_bounds.geospatial_latitude_limit_south > 90 or geospatial_bounds.geospatial_longitude_limit_east < -180 or geospatial_bounds.geospatial_longitude_limit_east > 180 or geospatial_bounds.geospatial_longitude_limit_west < -180 or geospatial_bounds.geospatial_longitude_limit_west > 180): - raise BadRequest ("Geospatial bounds data out of range") + raise BadRequest("Geospatial bounds data out of range") geo_index_obj = IonObject(OT.GeospatialIndex) west_lon = geospatial_bounds.geospatial_longitude_limit_west @@ -25,7 +25,7 @@ def calc_geospatial_point_center(cls, geospatial_bounds=None, distance=None): south_lat = geospatial_bounds.geospatial_latitude_limit_south if not distance: if (west_lon >= 0 and east_lon >= 0) or (west_lon <= 0 and east_lon <= 0): # Same hemisphere - if west_lon < east_lon: # West is "to the left" of East + if west_lon <= east_lon: # West is "to the left" of East distance = GeoUtils.DISTANCE_SHORTEST else: # West is "to the right" of East distance = GeoUtils.DISTANCE_LONGEST @@ -42,9 +42,9 @@ def calc_geospatial_point_center(cls, geospatial_bounds=None, distance=None): else: distance = GeoUtils.DISTANCE_SHORTEST if distance == GeoUtils.DISTANCE_SHORTEST: - geo_index_obj.lat, geo_index_obj.lon = GeoUtils.midpoint_shortest( north_lat=north_lat, west_lon=west_lon, south_lat=south_lat, east_lon=east_lon) + geo_index_obj.lat, geo_index_obj.lon = GeoUtils.midpoint_shortest(north_lat=north_lat, west_lon=west_lon, south_lat=south_lat, east_lon=east_lon) elif distance == GeoUtils.DISTANCE_LONGEST: - geo_index_obj.lat, geo_index_obj.lon = GeoUtils.midpoint_longest( north_lat=north_lat, west_lon=west_lon, south_lat=south_lat, east_lon=east_lon) + geo_index_obj.lat, geo_index_obj.lon = GeoUtils.midpoint_longest(north_lat=north_lat, west_lon=west_lon, south_lat=south_lat, east_lon=east_lon) else: raise BadRequest("Distance type not specified") return geo_index_obj @@ -101,8 +101,8 @@ def calc_bounding_box_for_boxes(obj_list, key_mapping=None): res_bb['lat_north'] = max(lat_north_list) if lat_north_list else 0.0 res_bb['lat_south'] = min(lat_south_list) if lat_south_list else 0.0 - res_bb['lon_east'] = max(lon_east_list) if lon_east_list else 0.0 res_bb['lon_west'] = min(lon_west_list) if lon_west_list else 0.0 + res_bb['lon_east'] = max(lon_east_list) if lon_east_list else 0.0 res_bb['depth_max'] = max(depth_min_list) if depth_min_list else 0.0 res_bb['depth_min'] = min(depth_max_list) if depth_max_list else 0.0 diff --git a/ion/util/test/test_geo_utils.py b/ion/util/test/test_geo_utils.py index 69e455165..f7abdc532 100644 --- a/ion/util/test/test_geo_utils.py +++ b/ion/util/test/test_geo_utils.py @@ -66,7 +66,9 @@ def test_midpoint(self): geospatial_bounds.geospatial_longitude_limit_west = 0 mid_point = GeoUtils.calc_geospatial_point_center(geospatial_bounds) self.assertAlmostEqual(mid_point.lat, 0.0) - self.assertAlmostEqual(mid_point.lon, -180.0) + self.assertAlmostEqual(mid_point.lon, 0.0) + # MM: changed this: if the same values are given, we expect a point not the full globe + #self.assertAlmostEqual(mid_point.lon, -180.0) geospatial_bounds.geospatial_latitude_limit_north = 40 geospatial_bounds.geospatial_latitude_limit_south = 50 From 560dbbbdf193ff8586e8a55265a5dd45cd624b1c Mon Sep 17 00:00:00 2001 From: Michael Meisinger Date: Tue, 16 Apr 2013 21:35:39 -0700 Subject: [PATCH 08/15] Added get_data_product_group_list to data product MS --- extern/ion-definitions | 2 +- extern/pyon | 2 +- .../data_product_management_service.py | 12 +++++ ..._product_management_service_integration.py | 49 ++++++------------- 4 files changed, 30 insertions(+), 35 deletions(-) diff --git a/extern/ion-definitions b/extern/ion-definitions index daf7b0dfc..17a7c30c4 160000 --- a/extern/ion-definitions +++ b/extern/ion-definitions @@ -1 +1 @@ -Subproject commit daf7b0dfc8963f94fb49a8dc9f618b4a67458a98 +Subproject commit 17a7c30c40fb4508c18ff7045e20d3ab9260ace8 diff --git a/extern/pyon b/extern/pyon index 218bf6140..96ffd8145 160000 --- a/extern/pyon +++ b/extern/pyon @@ -1 +1 @@ -Subproject commit 218bf614010d6abbf2bf6f9684d5e386a657e91b +Subproject commit 96ffd8145d5d4b9e060b1c761fb199fdf1b537b1 diff --git a/ion/services/sa/product/data_product_management_service.py b/ion/services/sa/product/data_product_management_service.py index 2640175bf..3f0623983 100644 --- a/ion/services/sa/product/data_product_management_service.py +++ b/ion/services/sa/product/data_product_management_service.py @@ -540,6 +540,18 @@ def get_last_update(self, data_product_id=''): continue return retval + def get_data_product_group_list(self, org_id=''): + group_names = set() + + # TODO: the return volume of this can be reduced by making a reduce query. + res_ids, keys = self.clients.resource_registry.find_resources_ext(RT.DataProduct, attr_name="ooi_product_name", id_only=True) + for key in keys: + group_name = key.get('attr_value', None) + if group_name: + group_names.add(group_name) + + return sorted(list(group_names)) + def _get_dataset_id(self, data_product_id=''): # find datasets for the data product dataset_id = '' diff --git a/ion/services/sa/product/test/test_data_product_management_service_integration.py b/ion/services/sa/product/test/test_data_product_management_service_integration.py index f7332df38..78ee4cd70 100644 --- a/ion/services/sa/product/test/test_data_product_management_service_integration.py +++ b/ion/services/sa/product/test/test_data_product_management_service_integration.py @@ -3,24 +3,32 @@ @file ion/services/sa/product/test/test_data_product_management_service_integration.py @brief Data Product Management Service Integration Tests ''' -import simplejson + +import unittest, gevent +import numpy as np from mock import patch +from nose.plugins.attrib import attr +from gevent.event import Event + +from pyon.util.int_test import IonIntegrationTestCase from pyon.core.exception import NotFound from pyon.public import IonObject from pyon.public import RT, PRED, CFG, OT -from pyon.util.int_test import IonIntegrationTestCase from pyon.util.log import log from pyon.util.context import LocalContextMixin from pyon.util.containers import DotDict from pyon.datastore.datastore import DataStore from pyon.ion.stream import StandaloneStreamSubscriber, StandaloneStreamPublisher -from pyon.ion.exchange import ExchangeNameQueue from pyon.ion.event import EventSubscriber, EventPublisher from ion.processes.data.last_update_cache import CACHE_DATASTORE_NAME +from ion.services.dm.ingestion.test.ingestion_management_test import IngestionManagementIntTest from ion.services.dm.utility.granule_utils import time_series_domain from ion.services.dm.utility.granule import RecordDictionaryTool -from ion.util.parameter_yaml_IO import get_param_dict +from ion.services.dm.utility.test.parameter_helper import ParameterHelper +from ion.services.dm.test.test_dm_end_2_end import DatasetMonitor +from ion.util.stored_values import StoredValueManager + from interface.services.dm.iuser_notification_service import UserNotificationServiceClient from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceClient from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient @@ -30,30 +38,9 @@ from interface.services.sa.idata_acquisition_management_service import DataAcquisitionManagementServiceClient from interface.services.sa.idata_product_management_service import DataProductManagementServiceClient from interface.services.dm.idata_retriever_service import DataRetrieverServiceClient - -from ion.util.stored_values import StoredValueManager from interface.objects import LastUpdate, ComputedValueAvailability, Granule, DataProduct -from ion.services.dm.ingestion.test.ingestion_management_test import IngestionManagementIntTest - -from nose.plugins.attrib import attr from interface.objects import ProcessDefinition, DataProducer, DataProcessProducerContext -from coverage_model.basic_types import AxisTypeEnum, MutabilityEnum -from coverage_model.coverage import CRS, GridDomain, GridShape -from coverage_model import ParameterContext, ParameterFunctionType, NumexprFunction, QuantityType - -from gevent.event import Event - -import unittest, gevent -import numpy as np - -from ion.services.dm.utility.granule import RecordDictionaryTool -from pyon.ion.event import EventSubscriber -from gevent.event import Event -from pyon.public import OT -from pyon.ion.stream import StandaloneStreamPublisher -from ion.services.dm.utility.test.parameter_helper import ParameterHelper -from ion.services.dm.test.test_dm_end_2_end import DatasetMonitor class FakeProcess(LocalContextMixin): name = '' @@ -174,6 +161,7 @@ def test_create_data_product(self): dp_obj.geospatial_bounds.geospatial_latitude_limit_south = -10.0 dp_obj.geospatial_bounds.geospatial_longitude_limit_east = 10.0 dp_obj.geospatial_bounds.geospatial_longitude_limit_west = -10.0 + dp_obj.ooi_product_name = "PRODNAME" #------------------------------------------------------------------------------------------------ # Create a set of ParameterContext objects to define the parameters in the coverage, add each to the ParameterDictionary @@ -222,6 +210,9 @@ def test_create_data_product(self): streamdefs.append(sd) self.assertIn(ctd_stream_def_id, streamdefs) + group_names = self.dpsc_cli.get_data_product_group_list() + self.assertIn("PRODNAME", group_names) + # test reading a non-existent data product log.debug('reading non-existent data product') @@ -283,8 +274,6 @@ def ion_object_encoder(obj): self.assertEqual(data_product_data.data_product_dataset[0].s, dp_id) - - # now 'delete' the data product log.debug("deleting data product: %s" % dp_id) self.dpsc_cli.delete_data_product(dp_id) @@ -317,8 +306,6 @@ def test_data_product_stream_def(self): sdom = sdom.dump() tdom = tdom.dump() - - dp_obj = IonObject(RT.DataProduct, name='DP1', description='some new dp', @@ -390,8 +377,6 @@ def cb(*args, **kwargs): self.assertEquals(set(rdt.fields), set(['time','temp'])) - - def test_activate_suspend_data_product(self): #------------------------------------------------------------------------------------------------ @@ -410,8 +395,6 @@ def test_activate_suspend_data_product(self): sdom = sdom.dump() tdom = tdom.dump() - - dp_obj = IonObject(RT.DataProduct, name='DP1', description='some new dp', From 6b7e3c2686ccf6b55eff67879e4eba41b022cc61 Mon Sep 17 00:00:00 2001 From: Maurice Manning Date: Wed, 17 Apr 2013 12:02:16 -0700 Subject: [PATCH 09/15] update to test aggregate and rollup status from platform and child instrument --- .../test/base_test_platform_agent_with_rsn.py | 73 +++++++-- .../test/test_platform_instrument.py | 153 +++++++++++++++--- 2 files changed, 190 insertions(+), 36 deletions(-) diff --git a/ion/agents/platform/test/base_test_platform_agent_with_rsn.py b/ion/agents/platform/test/base_test_platform_agent_with_rsn.py index ebae09e03..98b2f2ef2 100644 --- a/ion/agents/platform/test/base_test_platform_agent_with_rsn.py +++ b/ion/agents/platform/test/base_test_platform_agent_with_rsn.py @@ -36,7 +36,7 @@ from pyon.public import log import logging from pyon.public import IonObject -from pyon.core.exception import ServerError +from pyon.core.exception import ServerError, Conflict from pyon.util.int_test import IonIntegrationTestCase @@ -58,9 +58,12 @@ from nose.plugins.attrib import attr from pyon.agent.agent import ResourceAgentClient +from pyon.agent.agent import ResourceAgentState +from pyon.agent.agent import ResourceAgentEvent + from interface.objects import AgentCommand, ProcessStateEnum from interface.objects import StreamConfiguration -from interface.objects import StreamAlertType +from interface.objects import StreamAlertType, AggregateStatusType from ion.agents.port.port_agent_process import PortAgentProcessType, PortAgentType @@ -196,8 +199,8 @@ # The value should probably be defined in pyon.yml or some common place so # clients don't have to do updates upon new versions of the egg. -SBE37_EGG = "http://sddevrepo.oceanobservatories.org/releases/seabird_sbe37smb_ooicore-0.1.0-py2.7.egg" - +#SBE37_EGG = "http://sddevrepo.oceanobservatories.org/releases/seabird_sbe37smb_ooicore-0.1.1-py2.7.egg" +SBE37_EGG = "http://sddevrepo.oceanobservatories.org/releases/seabird_sbe37smb_ooicore-0.1.1-py2.7.egg" class FakeProcess(LocalContextMixin): """ @@ -422,12 +425,12 @@ def _get_instrument_stream_configs(self): configs copied from test_activate_instrument.py """ return [ - StreamConfiguration(stream_name='ctd_raw', + StreamConfiguration(stream_name='raw', parameter_dictionary_name='ctd_raw_param_dict', records_per_granule=2, granule_publish_rate=5), - StreamConfiguration(stream_name='ctd_parsed', + StreamConfiguration(stream_name='parsed', parameter_dictionary_name='ctd_parsed_param_dict', records_per_granule=2, granule_publish_rate=5) ] @@ -714,19 +717,31 @@ def _make_instrument_agent_structure(self, instr_key, org_obj, agent_config=None log.debug("new InstrumentDevice id = %s ", instrument_device_id) #Create stream alarms - alert_def = { + + + temp_alert_def = { 'name' : 'temperature_warning_interval', - 'stream_name' : 'ctd_parsed', + 'stream_name' : 'parsed', 'message' : 'Temperature is below the normal range of 50.0 and above.', 'alert_type' : StreamAlertType.WARNING, + 'aggregate_type' : AggregateStatusType.AGGREGATE_DATA, 'value_id' : 'temp', - 'resource_id' : instrument_device_id, - 'origin_type' : 'device', 'lower_bound' : 50.0, 'lower_rel_op' : '<', 'alert_class' : 'IntervalAlert' } + late_data_alert_def = { + 'name' : 'late_data_warning', + 'stream_name' : 'parsed', + 'message' : 'Expected data has not arrived.', + 'alert_type' : StreamAlertType.WARNING, + 'aggregate_type' : AggregateStatusType.AGGREGATE_COMMS, + 'value_id' : None, + 'time_delta' : 2, + 'alert_class' : 'LateDataAlert' + } + instrument_driver_config = self._set_up_pre_environment_for_instrument(instr_info) port_agent_config = { @@ -747,7 +762,7 @@ def _make_instrument_agent_structure(self, instr_key, org_obj, agent_config=None description="SBE37IMAgentInstance_%s" % instr_key, driver_config=instrument_driver_config, port_agent_config=port_agent_config, - alerts=[alert_def]) + alerts=[temp_alert_def, late_data_alert_def]) instrument_agent_instance_obj.agent_config = agent_config @@ -1223,3 +1238,39 @@ def _check_sync(self): self.assertEquals(retval.result[0:3], "OK:") return retval.result + def _stream_instruments(self): + from mi.instrument.seabird.sbe37smb.ooicore.driver import SBE37ProtocolEvent + from mi.instrument.seabird.sbe37smb.ooicore.driver import SBE37Parameter + + for instrument in self._setup_instruments.itervalues(): + # instruments that have been set up: instr_key: i_obj + + # Start a resource agent client to talk with the instrument agent. + _ia_client = self._create_resource_agent_client(instrument.instrument_device_id) + + cmd = AgentCommand(command=SBE37ProtocolEvent.START_AUTOSAMPLE) + retval = _ia_client.execute_resource(cmd) + log.debug('_stream_instruments retval: %s', retval) + + return + + def _idle_instruments(self): + from mi.instrument.seabird.sbe37smb.ooicore.driver import SBE37ProtocolEvent + from mi.instrument.seabird.sbe37smb.ooicore.driver import SBE37Parameter + + for instrument in self._setup_instruments.itervalues(): + # instruments that have been set up: instr_key: i_obj + + # Start a resource agent client to talk with the instrument agent. + _ia_client = self._create_resource_agent_client(instrument.instrument_device_id) + + cmd = AgentCommand(command=SBE37ProtocolEvent.STOP_AUTOSAMPLE) + with self.assertRaises(Conflict): + retval = _ia_client.execute_resource(cmd) + + cmd = AgentCommand(command=ResourceAgentEvent.RESET) + retval = _ia_client.execute_agent(cmd) + state = _ia_client.get_agent_state() + self.assertEqual(state, ResourceAgentState.UNINITIALIZED) + + return \ No newline at end of file diff --git a/ion/services/sa/observatory/test/test_platform_instrument.py b/ion/services/sa/observatory/test/test_platform_instrument.py index 9ca2e3eb5..d3f356477 100644 --- a/ion/services/sa/observatory/test/test_platform_instrument.py +++ b/ion/services/sa/observatory/test/test_platform_instrument.py @@ -25,6 +25,9 @@ from pyon.agent.agent import ResourceAgentClient from ion.agents.platform.test.base_test_platform_agent_with_rsn import FakeProcess from pyon.agent.agent import ResourceAgentState +from pyon.event.event import EventSubscriber + +from interface.services.sa.iinstrument_management_service import InstrumentManagementServiceClient from interface.objects import AgentCommand import unittest @@ -69,15 +72,24 @@ @patch.dict(CFG, {'endpoint': {'receive': {'timeout': 180}}}) -class Test(BaseIntTestPlatform): +class TestPlatformInstrument(BaseIntTestPlatform): + +# def setUp(self): +# # Start container +# super(TestPlatformInstrument, self).setUp() +# +# self.imsclient = InstrumentManagementServiceClient(node=self.container.node) + - @unittest.skip('This test is immature.') + + @unittest.skip('This test takes too long and gets Connect Refused errors.') def test_platform_with_instrument_streaming(self): # # The following is with just a single platform and the single # instrument "SBE37_SIM_08", which corresponds to the one on port 4008. # instr_key = "SBE37_SIM_08" + self.catch_alert= gevent.queue.Queue() p_root = self._set_up_single_platform_with_some_instruments([instr_key]) @@ -90,27 +102,29 @@ def test_platform_with_instrument_streaming(self): self._run() # note that this includes the instrument also getting to the command state + self._stream_instruments() + # get client to the instrument: # the i_obj is a DotDict with various pieces captured during the # set-up of the instrument, in particular instrument_device_id i_obj = self._get_instrument(instr_key) - log.debug("KK creating ResourceAgentClient") - ia_client = ResourceAgentClient(i_obj.instrument_device_id, - process=FakeProcess()) - log.debug("KK got ResourceAgentClient: %s", ia_client) - - # verify the instrument is command state: - state = ia_client.get_agent_state() - log.debug("KK instrument state: %s", state) - self.assertEqual(state, ResourceAgentState.COMMAND) - - # start streaming: - log.debug("KK starting instrument streaming: %s", state) - cmd = AgentCommand(command=SBE37ProtocolEvent.START_AUTOSAMPLE) - - # NOTE: commented out because of error (see other #!! lines) - #!! self._ia_client.execute_resource(cmd) +# log.debug("KK creating ResourceAgentClient") +# ia_client = ResourceAgentClient(i_obj.instrument_device_id, +# process=FakeProcess()) +# log.debug("KK got ResourceAgentClient: %s", ia_client) +# +# # verify the instrument is command state: +# state = ia_client.get_agent_state() +# log.debug("KK instrument state: %s", state) +# self.assertEqual(state, ResourceAgentState.COMMAND) + +# # start streaming: +# log.debug("KK starting instrument streaming: %s", state) +# cmd = AgentCommand(command=SBE37ProtocolEvent.START_AUTOSAMPLE) +# +# # NOTE: commented out because of error (see other #!! lines) +# self._ia_client.execute_resource(cmd) """ 2013-04-03 14:17:22,018 DEBUG Dummy-7 ion.services.sa.observatory.test.test_platform_instrument:121 KK starting instrument streaming: RESOURCE_AGENT_STATE_COMMAND ERROR @@ -132,20 +146,109 @@ def test_platform_with_instrument_streaming(self): """ # TODO set up listeners to verify things ... - # ... + #------------------------------------------------------------------------------------- + # Set up the subscriber to catch the alert event + #------------------------------------------------------------------------------------- + + def callback_for_alert(event, *args, **kwargs): + #log.debug("caught an alert: %s", event) + log.debug('TestPlatformInstrument recieved ION event: args=%s, kwargs=%s, event=%s.', + str(args), str(kwargs), str(args[0])) + log.debug('TestPlatformInstrument recieved ION event obj %s: ', event) + + # Get a resource agent client to talk with the instrument agent. + _ia_client = self._create_resource_agent_client(event.origin) + instAggStatus = _ia_client.get_agent(['aggstatus'])['aggstatus'] + log.debug('callback_for_alert consume_event aggStatus: %s', instAggStatus) + + if event.name == "temperature_warning_interval" and event.sub_type == "WARNING": + log.debug('temperature_warning_interval WARNING: ') + self.assertEqual(instAggStatus[2], 3) + + if event.name == "late_data_warning" and event.sub_type == "WARNING": + log.debug('LATE DATA WARNING: ') + #check for WARNING or OK becuase the ALL Clear event comes too quicky.. + self.assertTrue(instAggStatus[1] >= 2 ) + + # + # extended_instrument = self.imsclient.get_instrument_device_extension(i_obj.instrument_device_id) + # log.debug(' callback_for_alert communications_status_roll_up: %s', extended_instrument.computed.communications_status_roll_up) + # log.debug(' callback_for_alert data_status_roll_up: %s', extended_instrument.computed.data_status_roll_up) + + self.catch_alert.put(event) + + + def callback_for_agg_alert(event, *args, **kwargs): + #log.debug("caught an alert: %s", event) + log.debug('TestPlatformInstrument recieved AggStatus event: args=%s, kwargs=%s, event=%s.', + str(args), str(kwargs), str(args[0])) + log.debug('TestPlatformInstrument recieved AggStatus event obj %s: ', event) + + log.debug('TestPlatformInstrument recieved AggStatus event origin_type: %s ', event.origin_type) + log.debug('TestPlatformInstrument recieved AggStatus event origin: %s: ', event.origin) + + # Get a resource agent client to talk with the instrument agent. + _ia_client = self._create_resource_agent_client(event.origin) + aggstatus = _ia_client.get_agent(['aggstatus'])['aggstatus'] + log.debug('callback_for_agg_alert aggStatus: %s', aggstatus) + agg_status_comms = aggstatus[1] + agg_status_data = aggstatus[2] + + #platform status lags so check that instrument device status is at least known + if event.origin_type == "InstrumentDevice": + self.assertTrue(agg_status_comms >= 2) + + if event.origin_type == "PlatformDevice": + log.debug('PlatformDevice AggStatus ') + rollup_status = _ia_client.get_agent(['rollup_status'])['rollup_status'] + log.debug('callback_for_agg_alert rollup_status: %s', rollup_status) + rollup_status_comms = rollup_status[1] + rollup_status_data = rollup_status[2] + self.assertTrue(rollup_status_comms >= agg_status_comms ) + self.assertTrue(rollup_status_data >= agg_status_data ) + + child_agg_status = _ia_client.get_agent(['child_agg_status'])['child_agg_status'] + log.debug('callback_for_agg_alert child_agg_status: %s', child_agg_status) + #only one child instrument + child1_agg_status = child_agg_status[i_obj.instrument_device_id] + child1_agg_status_data = child1_agg_status[2] + self.assertTrue(rollup_status_data >= child1_agg_status_data ) + + self.catch_alert.put(event) + + #create a subscriber for the StreamAlertEvent from the instrument + self.event_subscriber = EventSubscriber(event_type='StreamAlertEvent', + origin=i_obj.instrument_device_id, + callback=callback_for_alert) + self.event_subscriber.start() + self.addCleanup(self.event_subscriber.stop) + + + #create a subscriber for the DeviceAggregateStatusEvent from the instrument and platform + self.event_subscriber = EventSubscriber(event_type='DeviceAggregateStatusEvent', + callback=callback_for_agg_alert) + self.event_subscriber.start() + self.addCleanup(self.event_subscriber.stop) + # sleep to let the streaming run for a while - #!! log.debug("KK sleeping ...") - #!! gevent.sleep(15) + log.debug("KK sleeping ...") + gevent.sleep(30) + + caught_events = [self.catch_alert.get(timeout=45)] + caught_events.append(self.catch_alert.get(timeout=45)) + log.debug("caught_events: %s", [c.type_ for c in caught_events]) - # stop streaming: - #!! log.debug("KK stopping instrument streaming: %s", state) - #!! cmd = AgentCommand(command=SBE37ProtocolEvent.STOP_AUTOSAMPLE) - #!! self._ia_client.execute_resource(cmd) +# # stop streaming: +# log.debug("KK stopping instrument streaming: %s", state) +# cmd = AgentCommand(command=SBE37ProtocolEvent.STOP_AUTOSAMPLE) +# self._ia_client.execute_resource(cmd) # TODO verifications ... # ... + + self._idle_instruments() # then shutdown the network: self._go_inactive() self._reset() From 19c812678d8706df6b873976bca6f87f4a756811 Mon Sep 17 00:00:00 2001 From: Ian Katz Date: Wed, 17 Apr 2013 15:39:44 -0400 Subject: [PATCH 10/15] Add new site extension hierarchy --- .../observatory_management_service.py | 122 ++++++++---------- ...ervatory_management_service_integration.py | 47 ++++++- ion/util/enhanced_resource_registry_client.py | 13 ++ ion/util/related_resources_crawler.py | 43 +++--- 4 files changed, 136 insertions(+), 89 deletions(-) diff --git a/ion/services/sa/observatory/observatory_management_service.py b/ion/services/sa/observatory/observatory_management_service.py index 5d372ed16..b29d732d8 100644 --- a/ion/services/sa/observatory/observatory_management_service.py +++ b/ion/services/sa/observatory/observatory_management_service.py @@ -771,7 +771,7 @@ def find_site_data_products(self, parent_resource_id='', include_sites=False, in ############################ - def get_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + def _get_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): """Returns an InstrumentDeviceExtension object containing additional related information @param site_id str @@ -795,6 +795,9 @@ def get_site_extension(self, site_id='', ext_associations=None, ext_exclude=None ext_exclude=ext_exclude, user_id=user_id) + RR2 = EnhancedResourceRegistryClient(self.RR) + RR2.cache_predicate(PRED.hasModel) + # Get status of Site instruments. a, b = self._get_instrument_states(extended_site.instrument_devices) extended_site.instruments_operational, extended_site.instruments_not_operational = a, b @@ -802,9 +805,8 @@ def get_site_extension(self, site_id='', ext_associations=None, ext_exclude=None # lookup all hasModel predicates # lookup is a 2d associative array of [subject type][subject id] -> object id lookup = dict([(rt, {}) for rt in [RT.InstrumentDevice, RT.PlatformDevice]]) - for a in self.RR.find_associations(predicate=PRED.hasModel, id_only=False): - if a.st in lookup: - lookup[a.st][a.s] = a.o + for a in RR2.filter_cached_associations(PRED.hasModel, lambda assn: assn.st in lookup): + lookup[a.st][a.s] = a.o def retrieve_model_objs(rsrc_list, object_type): # rsrc_list is devices that need models looked up. object_type is the resource type (a device) @@ -859,98 +861,86 @@ def short_status_rollup(key): except Exception as ex: log.exception("Computed attribute failed for site %s" % site_id) - return extended_site + return extended_site, RR2 - def get_observatory_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + def _get_site_extension_plus(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + # the "plus" means "plus all sub-site objects" - if not site_id: - raise BadRequest("The site_id parameter is empty") + extended_site, RR2 = self._get_site_extension(site_id, ext_associations, ext_exclude, user_id) - extended_resource_handler = ExtendedResourceContainer(self) + # use the related resources crawler + finder = RelatedResourcesCrawler() + get_assns = finder.generate_related_resources_partial(RR2, [PRED.hasSite]) + full_crawllist = [RT.InstrumentSite, RT.PlatformSite, RT.Subsite] + search_down = get_assns({PRED.hasSite: (True, False)}, full_crawllist) - extended_site = extended_resource_handler.create_extended_resource_container( - extended_resource_type=OT.SiteExtension, - resource_id=site_id, - computed_resource_type=OT.SiteComputedAttributes, - ext_associations=ext_associations, - ext_exclude=ext_exclude, - user_id=user_id) + # the searches return a list of association objects, so compile all the ids by extracting them + subsite_ids = set([]) - return extended_site + # we want only those IDs that are not the input resource id + for a in search_down(site_id, -1): + if a.o != site_id: + subsite_ids.add(a.o) + log.trace("converting retrieved ids to objects = %s" % subsite_ids) + subsite_objs = RR2.read_mult(list(subsite_ids)) - def get_platform_station_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + # filtered subsites + def fs(resource_type, filter_fn): + both = lambda s: ((resource_type == s._get_type()) and filter_fn(s)) + return filter(both, subsite_objs) - if not site_id: - raise BadRequest("The site_id parameter is empty") + def pfs(filter_fn): + return fs(RT.PlatformSite, filter_fn) - extended_resource_handler = ExtendedResourceContainer(self) + def ifs(filter_fn): + return fs(RT.InstrumentSite, filter_fn) - extended_site = extended_resource_handler.create_extended_resource_container( - extended_resource_type=OT.SiteExtension, - resource_id=site_id, - computed_resource_type=OT.SiteComputedAttributes, - ext_associations=ext_associations, - ext_exclude=ext_exclude, - user_id=user_id) + extended_site.computed.platform_station_sites = pfs(lambda s: "StationSite" == s.alt_resource_type) + extended_site.computed.platform_component_sites = pfs(lambda s: "PlatformComponentSite" == s.alt_resource_type) + extended_site.computed.platform_assembly_sites = pfs(lambda s: "PlatformAssemblySite" == s.alt_resource_type) + extended_site.computed.instrument_sites = ifs(lambda _: True) - return extended_site + return extended_site, RR2, subsite_objs + # TODO: will remove this one + def get_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + extended_site, _ = self._get_site_extension(site_id, ext_associations, ext_exclude, user_id) + return extended_site - def get_platform_assembly_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + def get_observatory_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + extended_site, RR2, subsite_objs = self._get_site_extension_plus(site_id, ext_associations, ext_exclude, user_id) + return extended_site - if not site_id: - raise BadRequest("The site_id parameter is empty") - extended_resource_handler = ExtendedResourceContainer(self) + def get_platform_station_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + extended_site, RR2, subsite_objs = self._get_site_extension_plus(site_id, ext_associations, ext_exclude, user_id) + return extended_site - extended_site = extended_resource_handler.create_extended_resource_container( - extended_resource_type=OT.SiteExtension, - resource_id=site_id, - computed_resource_type=OT.SiteComputedAttributes, - ext_associations=ext_associations, - ext_exclude=ext_exclude, - user_id=user_id) + def get_platform_assembly_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + extended_site, RR2, subsite_objs = self._get_site_extension_plus(site_id, ext_associations, ext_exclude, user_id) return extended_site def get_platform_component_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): - - if not site_id: - raise BadRequest("The site_id parameter is empty") - - extended_resource_handler = ExtendedResourceContainer(self) - - extended_site = extended_resource_handler.create_extended_resource_container( - extended_resource_type=OT.SiteExtension, - resource_id=site_id, - computed_resource_type=OT.SiteComputedAttributes, - ext_associations=ext_associations, - ext_exclude=ext_exclude, - user_id=user_id) - + extended_site, RR2, subsite_objs = self._get_site_extension_plus(site_id, ext_associations, ext_exclude, user_id) return extended_site def get_instrument_site_extension(self, site_id='', ext_associations=None, ext_exclude=None, user_id=''): + extended_site, _ = self._get_site_extension(site_id, ext_associations, ext_exclude, user_id) - if not site_id: - raise BadRequest("The site_id parameter is empty") - - extended_resource_handler = ExtendedResourceContainer(self) - - extended_site = extended_resource_handler.create_extended_resource_container( - extended_resource_type=OT.SiteExtension, - resource_id=site_id, - computed_resource_type=OT.SiteComputedAttributes, - ext_associations=ext_associations, - ext_exclude=ext_exclude, - user_id=user_id) + # no subsites of instrument, so shortcut + extended_site.computed.platform_station_sites = [] + extended_site.computed.platform_component_sites = [] + extended_site.computed.platform_assembly_sites = [] + extended_site.computed.instrument_sites = [] return extended_site + #Bogus functions for computed attributes def get_number_data_sets(self, observatory_id): return "0" diff --git a/ion/services/sa/observatory/test/test_observatory_management_service_integration.py b/ion/services/sa/observatory/test/test_observatory_management_service_integration.py index e1720eb0b..4704f1d50 100644 --- a/ion/services/sa/observatory/test/test_observatory_management_service_integration.py +++ b/ion/services/sa/observatory/test/test_observatory_management_service_integration.py @@ -2,6 +2,7 @@ #from pyon.ion.endpoint import ProcessRPCClient import string import unittest +from ion.util.enhanced_resource_registry_client import EnhancedResourceRegistryClient from pyon.util.containers import DotDict, get_ion_ts from pyon.util.int_test import IonIntegrationTestCase @@ -57,6 +58,7 @@ def setUp(self): self.container.start_rel_from_url('res/deploy/r2deploy.yml') self.RR = ResourceRegistryServiceClient(node=self.container.node) + self.RR2 = EnhancedResourceRegistryClient(self.RR) self.OMS = ObservatoryManagementServiceClient(node=self.container.node) self.org_management_service = OrgManagementServiceClient(node=self.container.node) self.IMS = InstrumentManagementServiceClient(node=self.container.node) @@ -311,8 +313,8 @@ def _make_associations(self): org_id = self.OMS.create_marine_facility(any_old(RT.Org)) - def create_under_org(resource_type): - obj = any_old(resource_type) + def create_under_org(resource_type, extra_fields=None): + obj = any_old(resource_type, extra_fields) if RT.InstrumentDevice == resource_type: resource_id = self.IMS.create_instrument_device(obj) @@ -572,6 +574,47 @@ def _do_test_find_observatory_org(self, resources): self.OMS.force_delete_instrument_site(instrument_site_id) + @attr('EXT') + def test_observatory_extensions(self): + + + + obs_id = self.RR2.create(any_old(RT.Observatory)) + pss_id = self.RR2.create(any_old(RT.PlatformSite, dict(alt_resource_type="StationSite"))) + pas_id = self.RR2.create(any_old(RT.PlatformSite, dict(alt_resource_type="PlatformAssemblySite"))) + pcs_id = self.RR2.create(any_old(RT.PlatformSite, dict(alt_resource_type="PlatformComponentSite"))) + ins_id = self.RR2.create(any_old(RT.InstrumentSite)) + + obs_obj = self.RR2.read(obs_id) + pss_obj = self.RR2.read(pss_id) + pas_obj = self.RR2.read(pas_id) + pcs_obj = self.RR2.read(pcs_id) + ins_obj = self.RR2.read(ins_id) + + self.RR2.create_association(obs_id, PRED.hasSite, pss_id) + self.RR2.create_association(pss_id, PRED.hasSite, pas_id) + self.RR2.create_association(pas_id, PRED.hasSite, pcs_id) + self.RR2.create_association(pcs_id, PRED.hasSite, ins_id) + + extended_obs = self.OMS.get_observatory_site_extension(obs_id, user_id=12345) + self.assertEqual([pss_obj], extended_obs.computed.platform_station_sites) + self.assertEqual([pas_obj], extended_obs.computed.platform_assembly_sites) + self.assertEqual([pcs_obj], extended_obs.computed.platform_component_sites) + self.assertEqual([ins_obj], extended_obs.computed.instrument_sites) + + extended_pss = self.OMS.get_observatory_site_extension(obs_id, user_id=12345) + self.assertEqual([pas_obj], extended_pss.computed.platform_assembly_sites) + self.assertEqual([pcs_obj], extended_pss.computed.platform_component_sites) + self.assertEqual([ins_obj], extended_pss.computed.instrument_sites) + + extended_pas = self.OMS.get_observatory_site_extension(pas_id, user_id=12345) + self.assertEqual([pcs_obj], extended_pas.computed.platform_component_sites) + self.assertEqual([ins_obj], extended_pas.computed.instrument_sites) + + extended_pcs = self.OMS.get_platform_component_site_extension(pcs_id, user_id=12345) + self.assertEqual([ins_obj], extended_pcs.computed.instrument_sites) + + #@unittest.skip("in development...") @attr('EXT') def test_observatory_org_extended(self): diff --git a/ion/util/enhanced_resource_registry_client.py b/ion/util/enhanced_resource_registry_client.py index 4c7bda245..259f0ae28 100644 --- a/ion/util/enhanced_resource_registry_client.py +++ b/ion/util/enhanced_resource_registry_client.py @@ -90,6 +90,7 @@ def __init__(self, rr_client): #raise BadRequest(str(mults)) # + # TODO: s/_cached_/_fetched_/g self._cached_predicates = {} self._cached_resources = {} @@ -472,6 +473,9 @@ def advance_lcs(self, resource_id, transition_event): def cache_predicate(self, predicate): """ Save all associations of a given predicate type to memory, for in-memory find_subjects/objects ops + + This is a PREFETCH operation, and EnhancedResourceRegistryClient objects that use the cache functionality + should NOT be persisted across service calls. """ log.info("Caching predicates: %s", predicate) log.debug("This cache is %s", self) @@ -485,6 +489,12 @@ def cache_predicate(self, predicate): self._cached_predicates[predicate] = preds + def filter_cached_associations(self, predicate, is_match_fn): + if not self.has_cached_prediate(predicate): + raise BadRequest("Attempted to filter cached associations of uncached predicate '%s'" % predicate) + + return [a for a in self._cached_predicates[predicate] if is_match_fn(a)] + def _add_resource_to_cache(self, resource_type, resource_obj): self._cached_resources[resource_type].by_id[resource_obj._id] = resource_obj @@ -497,6 +507,9 @@ def _add_resource_to_cache(self, resource_type, resource_obj): def cache_resources(self, resource_type, specific_ids=None): """ Save all resources of a given type to memory, for in-memory lookup ops + + This is a PREFETCH operation, and EnhancedResourceRegistryClient objects that use the cache functionality + should NOT be persisted across service calls. """ log.info("Caching resources: %s", resource_type) log.debug("This cache is %s", self) diff --git a/ion/util/related_resources_crawler.py b/ion/util/related_resources_crawler.py index d83fe9d66..1bafb2c57 100644 --- a/ion/util/related_resources_crawler.py +++ b/ion/util/related_resources_crawler.py @@ -4,9 +4,9 @@ @package ion.util.related_resources_crawler @author Ian Katz """ +from ion.util.enhanced_resource_registry_client import EnhancedResourceRegistryClient from ooi.logging import log -from ooi import logging class RelatedResourcesCrawler(object): @@ -25,24 +25,20 @@ def generate_related_resources_partial(self, returning an association list """ - # basically a lambda function: add a list of associations-matching-a-predicate to an accumulated list - def collect(acc, somepredicate): - return acc + resource_registry_client.find_associations(predicate=somepredicate, id_only=False) + # we need to "freeze" the partial function, by evaluating its internal data one time. def freeze(): - # get the full association list - master_assn_list = reduce(collect, predicate_list, []) + if isinstance(resource_registry_client, EnhancedResourceRegistryClient): + RR2 = resource_registry_client + else: + RR2 = EnhancedResourceRegistryClient(resource_registry_client) - if log.isEnabledFor(logging.TRACE): - summary = {} - for a in master_assn_list: - label = "%s %s %s" % (a.st, a.p, a.ot) - if not label in summary: summary[label] = 0 - summary[label] += 1 + for p in predicate_list: + if not RR2.has_cached_prediate(p): + RR2.cache_predicate(p) - log.trace("master assn list is %s", ["%s x%d" % (k, v) for k, v in summary.iteritems()]) def get_related_resources_partial_fn(predicate_dictionary, resource_whitelist): """ @@ -68,6 +64,8 @@ def get_related_resources_partial_fn(predicate_dictionary, resource_whitelist): assert type((True, True)) == type(v) assert type([]) == type(resource_whitelist) + for rt in resource_whitelist: + RR2.cache_resources(rt) def lookup_fn(resource_id): """ @@ -77,15 +75,18 @@ def lookup_fn(resource_id): """ retval = {} - for a in master_assn_list: - search_sto, search_ots = predicate_dictionary[a.p] + sto_match = lambda assn: assn.s == resource_id and assn.ot in resource_whitelist + ots_match = lambda assn: assn.o == resource_id and assn.st in resource_whitelist + for p, (search_sto, search_ots) in predicate_dictionary.iteritems(): + if search_sto: + for a in RR2.filter_cached_associations(p, sto_match): + log.trace("lookup_fn matched %s object", a.ot) + retval[a.o] = a + if search_ots: + for a in RR2.filter_cached_associations(p, ots_match): + log.trace("lookup_fn matched %s subject", a.st) + retval[a.s] = a - if search_sto and a.s == resource_id and a.ot in resource_whitelist: - log.trace("lookup_fn matched %s object", a.ot) - retval[a.o] = a - elif search_ots and a.o == resource_id and a.st in resource_whitelist: - log.trace("lookup_fn matched %s subject", a.st) - retval[a.s] = a return retval From c2314284e193f683847f44fa861ee0e77ae088b2 Mon Sep 17 00:00:00 2001 From: Stephen Henrie Date: Wed, 17 Apr 2013 12:43:08 -0700 Subject: [PATCH 11/15] Refactored to make reusable --- .../sa/test/test_find_related_resources.py | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/ion/services/sa/test/test_find_related_resources.py b/ion/services/sa/test/test_find_related_resources.py index 96230ee2b..196d386b3 100644 --- a/ion/services/sa/test/test_find_related_resources.py +++ b/ion/services/sa/test/test_find_related_resources.py @@ -36,29 +36,7 @@ def logger(fmt, *args): RT_SUBPLATFORMSITE = "SubPlatformSite" -@attr('INT', group='sa') -class TestFindRelatedResources(IonIntegrationTestCase): - """ - assembly integration tests at the service level - """ - - def setUp(self): - # Start container - self._start_container() - self.container.start_rel_from_url('res/deploy/r2deploy.yml') - - self.IMS = InstrumentManagementServiceClient(node=self.container.node) - self.OMS = ObservatoryManagementServiceClient(node=self.container.node) - - self.RR = ResourceRegistryServiceClient(node=self.container.node) - - self.care = {} - self.dontcare = {} - self.realtype = {} - -# @unittest.skip('this test just for debugging setup') -# def test_just_the_setup(self): -# return +class ResourceHelper(object): def create_any(self, resourcetype, first, label=None): @@ -185,6 +163,35 @@ def create_instrumentdevice(self, instrument_model_id, first=False): return instrumentdevice_id + + +@attr('INT', group='sa') +class TestFindRelatedResources(IonIntegrationTestCase, ResourceHelper): + """ + assembly integration tests at the service level + """ + + def setUp(self): + # Start container + self._start_container() + self.container.start_rel_from_url('res/deploy/r2deploy.yml') + + self.IMS = InstrumentManagementServiceClient(node=self.container.node) + self.OMS = ObservatoryManagementServiceClient(node=self.container.node) + + self.RR = ResourceRegistryServiceClient(node=self.container.node) + + + self.care = {} + self.dontcare = {} + self.realtype = {} + + +# @unittest.skip('this test just for debugging setup') +# def test_just_the_setup(self): +# return + + def create_dummy_structure(self): """ Create two observatories. @@ -210,7 +217,7 @@ def create_dummy_structure(self): for rt in [RT.Observatory, RT_SITE, RT.Subsite, RT.PlatformSite, RT_SUBPLATFORMSITE, RT.PlatformDevice, RT.PlatformModel, RT.InstrumentSite, RT.InstrumentDevice, RT.InstrumentModel - ]: + ]: self.assertIn(rt, self.care) self.expected_associations = [ @@ -266,7 +273,6 @@ def simplify_assn_resource_ids(self, assn_list): def describe_assn_graph(self, assn_list): return [("%s %s -> %s -> %s %s" % (a.st, a.s, a.p, a.ot, a.o)) for a in assn_list] - #@unittest.skip('refactoring') def test_related_resource_crawler(self): """ From 28ebdbf2ca2c94b8e591e0597f6298ab00fab05d Mon Sep 17 00:00:00 2001 From: Stephen Henrie Date: Wed, 17 Apr 2013 12:43:56 -0700 Subject: [PATCH 12/15] Fixed resource policy test and added log statements to debug builder issue --- ion/services/coi/test/test_governance.py | 14 +++++++------- .../sa/instrument/instrument_management_service.py | 7 +++++++ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/ion/services/coi/test/test_governance.py b/ion/services/coi/test/test_governance.py index 02faf9121..e202b56aa 100644 --- a/ion/services/coi/test/test_governance.py +++ b/ion/services/coi/test/test_governance.py @@ -35,9 +35,7 @@ from ion.services.sa.observatory.observatory_management_service import INSTRUMENT_OPERATOR_ROLE, OBSERVATORY_OPERATOR_ROLE from pyon.net.endpoint import RPCClient, BidirectionalEndpointUnit -from ion.services.sa.test.test_find_related_resources import TestFindRelatedResources -from ion.util.related_resources_crawler import RelatedResourcesCrawler - +from ion.services.sa.test.test_find_related_resources import ResourceHelper # This import will dynamically load the driver egg. It is needed for the MI includes below import ion.agents.instrument.test.test_instrument_agent @@ -2037,7 +2035,7 @@ def test_instrument_lifecycle_policy(self): @attr('INT', group='coi') -class TestResourcePolicyInt(TestFindRelatedResources): +class TestResourcePolicyInt(IonIntegrationTestCase, ResourceHelper): def __init__(self, *args, **kwargs): @@ -2051,6 +2049,10 @@ def __init__(self, *args, **kwargs): else: log.info('Running on a Mac)') + self.care = {} + self.dontcare = {} + self.realtype = {} + IonIntegrationTestCase.__init__(self, *args, **kwargs) def setUp(self): @@ -2130,9 +2132,6 @@ def tearDown(self): gevent.sleep(self.SLEEP_TIME) # Wait for events to be fired and policy updated - @unittest.skip('Overriding to skip') - def test_related_resource_crawler(self): - pass # overriding to not test here def test_related_resource_policies(self): """ @@ -2143,6 +2142,7 @@ def test_related_resource_policies(self): self.create_observatory(True, create_with_marine_facility=True) self.create_observatory(False, create_with_marine_facility=True) + from ion.util.related_resources_crawler import RelatedResourcesCrawler r = RelatedResourcesCrawler() inst_devices,_ = self.rr_client.find_resources(restype=RT.InstrumentDevice) diff --git a/ion/services/sa/instrument/instrument_management_service.py b/ion/services/sa/instrument/instrument_management_service.py index 92b4dcdb5..071f3afb3 100644 --- a/ion/services/sa/instrument/instrument_management_service.py +++ b/ion/services/sa/instrument/instrument_management_service.py @@ -805,6 +805,8 @@ def check_direct_access_policy(self, process, message, headers): except Inconsistent, ex: return False, ex.message + log.debug("check_direct_access_policy: actor info: %s %s %s", gov_values.actor_id, gov_values.actor_roles, gov_values.resource_id) + #The system actor can to anything if has_org_role(gov_values.actor_roles , self.container.governance_controller.system_root_org_name, [ION_MANAGER]): return True, '' @@ -822,6 +824,7 @@ def check_device_lifecycle_policy(self, process, message, headers): except Inconsistent, ex: return False, ex.message + log.debug("check_device_lifecycle_policy: actor info: %s %s %s", gov_values.actor_id, gov_values.actor_roles, gov_values.resource_id) #The system actor can to anything if has_org_role(gov_values.actor_roles , self.container.governance_controller.system_root_org_name, [ION_MANAGER]): return True, '' @@ -831,6 +834,8 @@ def check_device_lifecycle_policy(self, process, message, headers): else: raise Inconsistent('%s(%s) has been denied since the lifecycle_event can not be found in the message'% (process.name, gov_values.op)) + log.debug("check_device_lifecycle_policy: lifecycle_event: %s", lifecycle_event) + orgs,_ = self.clients.resource_registry.find_subjects(subject_type=RT.Org, predicate=PRED.hasResource, object=gov_values.resource_id, id_only=False) if not orgs: @@ -848,11 +853,13 @@ def check_device_lifecycle_policy(self, process, message, headers): #The owner can do any of these other lifecycle transitions is_owner = is_resource_owner(gov_values.actor_id, gov_values.resource_id) + log.debug("check_device_lifecycle_policy: is_owner: %s", str(is_owner)) if is_owner: return True, '' #TODO - this shared commitment might not be with the right Org - may have to relook at how this is working. is_shared = has_shared_resource_commitment(gov_values.actor_id, gov_values.resource_id) + log.debug("check_device_lifecycle_policy: is_shared: %s", str(is_shared)) #Check across Orgs which have shared this device for role which as proper level to allow lifecycle transition for org in orgs: From 1d918a2f5f509930d9eb2c1ee8ee2d14769ac016 Mon Sep 17 00:00:00 2001 From: Stephen Henrie Date: Wed, 17 Apr 2013 12:45:28 -0700 Subject: [PATCH 13/15] Bumped ion-defs --- extern/ion-definitions | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extern/ion-definitions b/extern/ion-definitions index 17a7c30c4..c8f074733 160000 --- a/extern/ion-definitions +++ b/extern/ion-definitions @@ -1 +1 @@ -Subproject commit 17a7c30c40fb4508c18ff7045e20d3ab9260ace8 +Subproject commit c8f074733d8976459de195548d1a01de8f77e149 From a2caaaa99cbcc4c152f106d3b2343b6ff3ff7ac6 Mon Sep 17 00:00:00 2001 From: Maurice Manning Date: Wed, 17 Apr 2013 13:02:03 -0700 Subject: [PATCH 14/15] bump ion-defs --- extern/ion-definitions | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extern/ion-definitions b/extern/ion-definitions index 17a7c30c4..f4aa663ea 160000 --- a/extern/ion-definitions +++ b/extern/ion-definitions @@ -1 +1 @@ -Subproject commit 17a7c30c40fb4508c18ff7045e20d3ab9260ace8 +Subproject commit f4aa663ea8a908d4662f574e1b9d1d2d2eabe586 From 4f4d28077ad1150a9e4b8bb38ac3158fe09fa7b0 Mon Sep 17 00:00:00 2001 From: Ian Katz Date: Wed, 17 Apr 2013 16:06:44 -0400 Subject: [PATCH 15/15] remove unused functions --- .../observatory_management_service.py | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/ion/services/sa/observatory/observatory_management_service.py b/ion/services/sa/observatory/observatory_management_service.py index b29d732d8..b139682ac 100644 --- a/ion/services/sa/observatory/observatory_management_service.py +++ b/ion/services/sa/observatory/observatory_management_service.py @@ -940,33 +940,6 @@ def get_instrument_site_extension(self, site_id='', ext_associations=None, ext_e return extended_site - - #Bogus functions for computed attributes - def get_number_data_sets(self, observatory_id): - return "0" - - def get_number_instruments_deployed(self, observatory_id): - return "0" - - - def get_number_instruments_operational(self, observatory_id): - return "0" - - - def get_number_instruments_inoperational(self, observatory_id): - return "0" - - - def get_number_instruments(self, observatory_id): - return "0" - - - def get_number_platforms(self, observatory_id): - return "0" - - def get_number_platforms_deployed(self, observatory_id): - return "0" - def _get_instrument_states(self, instrument_device_obj_list=None): op = []