Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Matplotlib graphs can be ingested and retrieved
  • Loading branch information
rajvikram committed Dec 5, 2012
1 parent 5221a88 commit 0b80865
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
6 changes: 3 additions & 3 deletions ion/processes/bootstrap/ion_loader.py
Expand Up @@ -1870,12 +1870,12 @@ def _load_Workflow(self,row):
configuration = self._get_typed_value(configuration, targettype="dict") configuration = self._get_typed_value(configuration, targettype="dict")
configuration["in_dp_id"] = in_dp_id configuration["in_dp_id"] = in_dp_id


persist_data = False persist_data_flag = False
if row["persist_data"] == "TRUE": if row["persist_data"] == "TRUE":
persist_data = True persist_data_flag = True
#Create and start the workflow #Create and start the workflow
workflow_id, workflow_product_id = workflow_client.create_data_process_workflow(workflow_definition_id=workflow_def_id, workflow_id, workflow_product_id = workflow_client.create_data_process_workflow(workflow_definition_id=workflow_def_id,
input_data_product_id=in_dp_id, persist_workflow_data_product=persist_data, configuration=configuration, timeout=30) input_data_product_id=in_dp_id, persist_workflow_data_product=persist_data_flag, configuration=configuration, timeout=30)


def _load_Deployment(self,row): def _load_Deployment(self,row):
constraints = self._get_constraints(row, type='Deployment') constraints = self._get_constraints(row, type='Deployment')
Expand Down
23 changes: 18 additions & 5 deletions ion/processes/data/transforms/viz/matplotlib_graphs.py
Expand Up @@ -13,7 +13,8 @@
import StringIO import StringIO
import time import time
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient
from ion.core.process.transform import TransformStreamPublisher, TransformEventListener from ion.core.process.transform import TransformStreamPublisher, TransformEventListener, TransformStreamListener

from interface.services.cei.ischeduler_service import SchedulerServiceProcessClient from interface.services.cei.ischeduler_service import SchedulerServiceProcessClient
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceProcessClient from interface.services.coi.iresource_registry_service import ResourceRegistryServiceProcessClient
from pyon.event.event import EventSubscriber from pyon.event.event import EventSubscriber
Expand All @@ -34,14 +35,15 @@


mpl_output_image_format = "png" mpl_output_image_format = "png"


class VizTransformMatplotlibGraphs(TransformStreamPublisher, TransformEventListener): class VizTransformMatplotlibGraphs(TransformStreamPublisher, TransformEventListener, TransformStreamListener):


""" """
This class is used for instantiating worker processes that have subscriptions to data streams and convert This class is used for instantiating worker processes that have subscriptions to data streams and convert
incoming data from CDM format to Matplotlib graphs incoming data from CDM format to Matplotlib graphs
""" """
output_bindings = ['graph_image_param_dict'] output_bindings = ['graph_image_param_dict']
event_timer_interval = None




def on_start(self): def on_start(self):
Expand All @@ -64,17 +66,28 @@ def on_start(self):
graph_time_periods= self.CFG.get_safe('graph_time_periods') graph_time_periods= self.CFG.get_safe('graph_time_periods')


# If this is meant to be an event driven process, schedule an event to be generated every few minutes/hours # If this is meant to be an event driven process, schedule an event to be generated every few minutes/hours
event_timer_interval = self.CFG.get_safe('graph_update_interval') self.event_timer_interval = self.CFG.get_safe('graph_update_interval')
if event_timer_interval: if self.event_timer_interval:
event_origin = "Interval_Timer_Matplotlib" event_origin = "Interval_Timer_Matplotlib"
sub = EventSubscriber(event_type="ResourceEvent", callback=self.interval_timer_callback, origin=event_origin) sub = EventSubscriber(event_type="ResourceEvent", callback=self.interval_timer_callback, origin=event_origin)
sub.start() sub.start()


self.interval_timer_id = self.ssclient.create_interval_timer(start_time="now" , interval=self._str_to_secs(event_timer_interval), self.interval_timer_id = self.ssclient.create_interval_timer(start_time="now" , interval=self._str_to_secs(self.event_timer_interval),
event_origin=event_origin, event_subtype="") event_origin=event_origin, event_subtype="")


super(VizTransformMatplotlibGraphs,self).on_start() super(VizTransformMatplotlibGraphs,self).on_start()


# when tranform is used as a data process
def recv_packet(self, packet, in_stream_route, in_stream_id):
#Check to see if the class instance was set up as a event triggered transform. If yes, skip the packet
if self.event_timer_interval:
return

log.info('Received packet')
mpl_data_granule = VizTransformMatplotlibGraphsAlgorithm.execute(packet, params=self.get_stream_definition())
for stream_name in self.stream_names:
publisher = getattr(self, stream_name)
publisher.publish(mpl_data_granule)


def get_stream_definition(self): def get_stream_definition(self):
stream_id = self.stream_ids[0] stream_id = self.stream_ids[0]
Expand Down
12 changes: 6 additions & 6 deletions ion/services/ans/test/test_workflow.py
Expand Up @@ -3,7 +3,7 @@
import unittest, os import unittest, os
from nose.plugins.attrib import attr from nose.plugins.attrib import attr


from pyon.public import CFG, RT, LCS, PRED,IonObject from pyon.public import log, CFG, RT, LCS, PRED,IonObject


from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceProcessClient from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceProcessClient
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceProcessClient from interface.services.coi.iresource_registry_service import ResourceRegistryServiceProcessClient
Expand Down Expand Up @@ -262,16 +262,16 @@ def test_google_dt_transform_workflow(self):
self.workflowclient.terminate_data_process_workflow(workflow_id, False) # Should test true at some point self.workflowclient.terminate_data_process_workflow(workflow_id, False) # Should test true at some point


#Validate the data from each of the messages along the way #Validate the data from each of the messages along the way
#self.validate_google_dt_transform_results(results) self.validate_google_dt_transform_results(results)


""" """
# Check to see if ingestion worked. Extract the granules from data_retrieval. # Check to see if ingestion worked. Extract the granules from data_retrieval.
# First find the dataset associated with the output dp product # First find the dataset associated with the output dp product
ds_ids,_ = self.rrclient.find_objects(workflow_dp_ids[len(workflow_dp_ids) - 1], PRED.hasDataset, RT.DataSet, True) ds_ids,_ = self.rrclient.find_objects(workflow_dp_ids[len(workflow_dp_ids) - 1], PRED.hasDataset, RT.DataSet, True)
retrieve_granule = self.data_retriever.retrieve(ds_ids[0]) retrieved_granule = self.data_retriever.retrieve(ds_ids[0])
#Validate the data from each of the messages along the way #Validate the data from each of the messages along the way
self.validate_google_dt_transform_results(retrieve_granule) self.validate_google_dt_transform_results(retrieved_granule)
""" """


#Cleanup to make sure delete is correct. #Cleanup to make sure delete is correct.
Expand Down Expand Up @@ -334,10 +334,10 @@ def test_mpl_graphs_transform_workflow(self):
# Check to see if ingestion worked. Extract the granules from data_retrieval. # Check to see if ingestion worked. Extract the granules from data_retrieval.
# First find the dataset associated with the output dp product # First find the dataset associated with the output dp product
ds_ids,_ = self.rrclient.find_objects(workflow_dp_ids[len(workflow_dp_ids) - 1], PRED.hasDataset, RT.DataSet, True) ds_ids,_ = self.rrclient.find_objects(workflow_dp_ids[len(workflow_dp_ids) - 1], PRED.hasDataset, RT.DataSet, True)
retrieve_granule = self.data_retriever.retrieve_last_granule(ds_ids[0]) retrieved_granule = self.data_retriever.retrieve_last_granule(ds_ids[0])


#Validate the data from each of the messages along the way #Validate the data from each of the messages along the way
self.validate_mpl_graphs_transform_results(retrieve_granule) self.validate_mpl_graphs_transform_results(retrieved_granule)


#Cleanup to make sure delete is correct. #Cleanup to make sure delete is correct.
self.workflowclient.delete_workflow_definition(workflow_def_id) self.workflowclient.delete_workflow_definition(workflow_def_id)
Expand Down

0 comments on commit 0b80865

Please sign in to comment.