Permalink
Browse files

Merge remote-tracking branch 'upstream/master'

  • Loading branch information...
2 parents 9ce60e0 + 6c62b73 commit 51a298c9ffd3217ca1eb4072049db8775060919d @wfrench wfrench committed Dec 5, 2012
@@ -828,8 +828,7 @@ def _async_driver_event_sample(self, val, ts):
self._proc_name, stream_name)
except:
- log.error('Instrument agent %s could not publish data.',
- self._proc_name)
+ log.exception('Instrument agent %s could not publish data.', self._proc_name)
def _async_driver_event_error(self, val, ts):
"""
@@ -79,7 +79,7 @@
MASTER_DOC = "https://docs.google.com/spreadsheet/pub?key=0AttCeOvLP6XMdG82NHZfSEJJOGdQTkgzb05aRjkzMEE&output=xls"
### the URL below should point to a COPY of the master google spreadsheet that works with this version of the loader
-TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AgGScp7mjYjydHdjdndOVkUyazZQaUNfYzBUSXJ3Rnc&output=xls"
+TESTED_DOC = "https://docs.google.com/spreadsheet/pub?key=0AgkUKqO5m-ZidE01OXVvMnhraVZtM05rNkthQnVjU1E&output=xls"
#
### while working on changes to the google doc, use this to run test_loader.py against the master spreadsheet
#TESTED_DOC=MASTER_DOC
@@ -94,6 +94,7 @@
'CoordinateSystem',
'ParameterDefs',
'ParameterDictionary',
+ 'StreamConfiguration',
'SensorModel',
'PlatformModel',
'InstrumentModel',
@@ -171,7 +172,7 @@ def on_start(self):
self.unknown_fields = {} # track unknown fields so we only warn once
self.constraint_defs = {} # alias -> value for refs, since not stored in DB
self.contact_defs = {} # alias -> value for refs, since not stored in DB
-
+ self.stream_config = {} # name -> obj for StreamConfiguration objects, used by *AgentInstance
# Loads internal bootstrapped resource ids that will be referenced during preload
self._load_system_ids()
@@ -397,7 +398,7 @@ def _create_object_from_row(self, objtype, row, prefix='',
log.trace("Update object type %s using field names %s", objtype, obj_fields.keys())
obj = existing_obj
else:
- if row[self.COL_ID] and 'alt_ids' in schema:
+ if self.COL_ID in row and row[self.COL_ID] and 'alt_ids' in schema:
if 'alt_ids' in obj_fields:
obj_fields['alt_ids'].append("PRE:"+row[self.COL_ID])
else:
@@ -1268,7 +1269,7 @@ def _load_ParameterDictionary(self, row):
temporal_context=temporal_parameter,
headers=self._get_system_actor_headers())
except NotFound as e:
- log.error('Missing parameter context %s', e.message)
+ log.error('Parameter dictionary %s missing context: %s', row['name'], e.message)
def _load_PlatformDevice(self, row):
contacts = self._get_contacts(row, field='contact_ids', type='InstrumentDevice')
@@ -1406,6 +1407,11 @@ def _load_SensorDevice(self, row):
headers=self._get_system_actor_headers())
self._resource_advance_lcs(row, res_id, "SensorDevice")
+ def _load_StreamConfiguration(self, row):
+ """ parse and save for use in *AgentInstance objects """
+ obj = self._create_object_from_row("StreamConfiguration", row, "cfg/")
+ self.stream_config[row['ID']] = obj
+
def _load_InstrumentAgent(self, row):
res_id = self._basic_resource_create(row, "InstrumentAgent", "ia/",
"instrument_management", "create_instrument_agent",
@@ -1468,6 +1474,9 @@ def _load_InstrumentAgentInstance(self, row):
'data_port': int(row['comms_server_port']),
'log_level': 5, }
+ stream_config_names = self._get_typed_value(row['stream_configurations'], targettype="simplelist")
+ agent_instance.stream_configurations = [ self.stream_config[name] for name in stream_config_names ]
+
# save
agent_id = self.resource_ids[row["instrument_agent_id"]]
device_id = self.resource_ids[row["instrument_device_id"]]
@@ -1532,6 +1541,9 @@ def _load_PlatformAgentInstance(self, row):
'driver_config': driver_config }
agent_instance.agent_config = { 'platform_config': platform_config }
+ stream_config_names = self._get_typed_value(row['stream_configurations'], targettype="simplelist")
+ agent_instance.stream_configurations = [ self.stream_config[name] for name in stream_config_names ]
+
id = self._get_service_client("instrument_management").create_platform_agent_instance(
agent_instance, platform_agent_id, platform_device_id, headers=self._get_system_actor_headers())
self.resource_ids[row['ID']] = id
@@ -1908,13 +1920,14 @@ def _load_Workflow(self,row):
configuration = self._get_typed_value(configuration, targettype="dict")
configuration["in_dp_id"] = in_dp_id
- persist_data = False
+ persist_data_flag = False
if row["persist_data"] == "TRUE":
- persist_data = True
+ persist_data_flag = True
+
#Create and start the workflow
workflow_id, workflow_product_id = workflow_client.create_data_process_workflow(
workflow_definition_id=workflow_def_id,
- input_data_product_id=in_dp_id, persist_workflow_data_product=persist_data, configuration=configuration, timeout=30,
+ input_data_product_id=in_dp_id, persist_workflow_data_product=persist_data_flag, configuration=configuration, timeout=30,
headers=self._get_system_actor_headers())
def _load_Deployment(self,row):
@@ -27,7 +27,7 @@ def test_lca_load(self):
self.assertTrue(len(res) > 1)
found = False
for org in res:
- if org.name=='RSN':
+ if org.name=='Regional_Scale_Nodes':
self.assertFalse(found, msg='Found more than one Org "RSN" -- should have preloaded one')
found = True
self.assertFalse(org.contacts is None)
@@ -37,8 +37,9 @@ def test_lca_load(self):
# check data product
# res,_ = self.container.resource_registry.find_resources(RT.DataProduct, name='CTDBP-1012-REC1 Raw', id_only=False)
- res,_ = self.container.resource_registry.find_resources(RT.DataProduct, name='IOC Demo - CTDBP-1012-REC1 Raw', id_only=False)
- self.assertEquals(1, len(res))
+ target_name = 'NSF Demo - CTDBP-1012-REC1 Raw'
+ res,_ = self.container.resource_registry.find_resources(RT.DataProduct, name=target_name, id_only=False)
+ self.assertEquals(1, len(res), msg='failed to find data product: '+target_name)
dp = res[0]
formats = dp.available_formats
self.assertEquals(2, len(formats))
@@ -96,3 +97,12 @@ def test_lca_load(self):
# check for platform agents
res,_ = self.container.resource_registry.find_resources(RT.PlatformAgentInstance, id_only=False)
self.assertTrue(len(res)>0)
+ agent_instance = None
+ for pai in res:
+ if pai.name=='Platform Agent':
+ agent_instance = pai
+ break
+ self.assertTrue(agent_instance)
+ self.assertEquals(2, len(agent_instance.stream_configurations))
+ parsed = agent_instance.stream_configurations[1]
+ self.assertEquals('platform_eng_parsed', parsed.parameter_dictionary_name)
@@ -13,7 +13,8 @@
import StringIO
import time
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient
-from ion.core.process.transform import TransformStreamPublisher, TransformEventListener
+from ion.core.process.transform import TransformStreamPublisher, TransformEventListener, TransformStreamListener
+
from interface.services.cei.ischeduler_service import SchedulerServiceProcessClient
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceProcessClient
from pyon.event.event import EventSubscriber
@@ -34,14 +35,15 @@
mpl_output_image_format = "png"
-class VizTransformMatplotlibGraphs(TransformStreamPublisher, TransformEventListener):
+class VizTransformMatplotlibGraphs(TransformStreamPublisher, TransformEventListener, TransformStreamListener):
"""
This class is used for instantiating worker processes that have subscriptions to data streams and convert
incoming data from CDM format to Matplotlib graphs
"""
output_bindings = ['graph_image_param_dict']
+ event_timer_interval = None
def on_start(self):
@@ -64,17 +66,28 @@ def on_start(self):
graph_time_periods= self.CFG.get_safe('graph_time_periods')
# If this is meant to be an event driven process, schedule an event to be generated every few minutes/hours
- event_timer_interval = self.CFG.get_safe('graph_update_interval')
- if event_timer_interval:
+ self.event_timer_interval = self.CFG.get_safe('graph_update_interval')
+ if self.event_timer_interval:
event_origin = "Interval_Timer_Matplotlib"
sub = EventSubscriber(event_type="ResourceEvent", callback=self.interval_timer_callback, origin=event_origin)
sub.start()
- self.interval_timer_id = self.ssclient.create_interval_timer(start_time="now" , interval=self._str_to_secs(event_timer_interval),
+ self.interval_timer_id = self.ssclient.create_interval_timer(start_time="now" , interval=self._str_to_secs(self.event_timer_interval),
event_origin=event_origin, event_subtype="")
super(VizTransformMatplotlibGraphs,self).on_start()
+ # when tranform is used as a data process
+ def recv_packet(self, packet, in_stream_route, in_stream_id):
+ #Check to see if the class instance was set up as a event triggered transform. If yes, skip the packet
+ if self.event_timer_interval:
+ return
+
+ log.info('Received packet')
+ mpl_data_granule = VizTransformMatplotlibGraphsAlgorithm.execute(packet, params=self.get_stream_definition())
+ for stream_name in self.stream_names:
+ publisher = getattr(self, stream_name)
+ publisher.publish(mpl_data_granule)
def get_stream_definition(self):
stream_id = self.stream_ids[0]
@@ -3,7 +3,7 @@
import unittest, os
from nose.plugins.attrib import attr
-from pyon.public import CFG, RT, LCS, PRED,IonObject
+from pyon.public import log, CFG, RT, LCS, PRED,IonObject
from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceProcessClient
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceProcessClient
@@ -262,16 +262,16 @@ def test_google_dt_transform_workflow(self):
self.workflowclient.terminate_data_process_workflow(workflow_id, False) # Should test true at some point
#Validate the data from each of the messages along the way
- #self.validate_google_dt_transform_results(results)
+ self.validate_google_dt_transform_results(results)
"""
# Check to see if ingestion worked. Extract the granules from data_retrieval.
# First find the dataset associated with the output dp product
ds_ids,_ = self.rrclient.find_objects(workflow_dp_ids[len(workflow_dp_ids) - 1], PRED.hasDataset, RT.DataSet, True)
- retrieve_granule = self.data_retriever.retrieve(ds_ids[0])
+ retrieved_granule = self.data_retriever.retrieve(ds_ids[0])
#Validate the data from each of the messages along the way
- self.validate_google_dt_transform_results(retrieve_granule)
+ self.validate_google_dt_transform_results(retrieved_granule)
"""
#Cleanup to make sure delete is correct.
@@ -334,10 +334,10 @@ def test_mpl_graphs_transform_workflow(self):
# Check to see if ingestion worked. Extract the granules from data_retrieval.
# First find the dataset associated with the output dp product
ds_ids,_ = self.rrclient.find_objects(workflow_dp_ids[len(workflow_dp_ids) - 1], PRED.hasDataset, RT.DataSet, True)
- retrieve_granule = self.data_retriever.retrieve_last_granule(ds_ids[0])
+ retrieved_granule = self.data_retriever.retrieve_last_granule(ds_ids[0])
#Validate the data from each of the messages along the way
- self.validate_mpl_graphs_transform_results(retrieve_granule)
+ self.validate_mpl_graphs_transform_results(retrieved_granule)
#Cleanup to make sure delete is correct.
self.workflowclient.delete_workflow_definition(workflow_def_id)
Oops, something went wrong.

0 comments on commit 51a298c

Please sign in to comment.