Skip to content

Commit

Permalink
Create RDT in stream publisher using serialized stream def object.
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardhunter2 committed May 28, 2013
1 parent 3d2d1c3 commit 64608ff
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
24 changes: 20 additions & 4 deletions ion/agents/agent_stream_publisher.py
Expand Up @@ -27,6 +27,11 @@
from ion.services.dm.utility.granule_utils import RecordDictionaryTool
from interface.objects import StreamRoute

# Pyon Object Serialization
from pyon.core.bootstrap import get_obj_registry
from pyon.core.object import IonObjectDeserializer


class AgentStreamPublisher(object):
"""
"""
Expand All @@ -53,11 +58,22 @@ def __init__(self, agent):
agent.aparam_set_streams = self.aparam_set_streams

def _construct_streams(self, stream_info):
decoder = IonObjectDeserializer(obj_registry=get_obj_registry())
for (stream_name, config) in stream_info.iteritems():
stream_def = config['stream_definition_ref']
rdt = RecordDictionaryTool(stream_definition_id=stream_def)
self._agent.aparam_streams[stream_name] = rdt.fields
self._agent.aparam_pubrate[stream_name] = 0
try:
#stream_def = config['stream_definition_ref']
stream_def_dict = config['stream_def_dict']
stream_def_obj = decoder.deserialize(stream_def_dict)
#rdt = RecordDictionaryTool(stream_definition_id=stream_def)
rdt = RecordDictionaryTool(stream_definition=stream_def_obj)
self._agent.aparam_streams[stream_name] = rdt.fields
self._agent.aparam_pubrate[stream_name] = 0
except Exception as e:
errmsg = 'Instrument agent %s' % self._agent._proc_name
errmsg += 'error constructing stream %s. ' % stream_name
errmsg += str(e)
log.error(errmsg)

self._agent.aparam_set_pubrate = self.aparam_set_pubrate

def _construct_publishers(self, stream_info):
Expand Down
19 changes: 13 additions & 6 deletions ion/agents/instrument/test/test_instrument_agent.py
Expand Up @@ -40,8 +40,7 @@
from pyon.util.int_test import IonIntegrationTestCase

# Pyon Object Serialization
from pyon.core.bootstrap import get_obj_registry
from pyon.core.object import IonObjectDeserializer
from pyon.core.object import IonObjectSerializer

# Pyon exceptions.
from pyon.core.exception import BadRequest, Conflict, Timeout, ResourceError
Expand Down Expand Up @@ -396,37 +395,45 @@ def _build_stream_config(self):
pubsub_client = PubsubManagementServiceClient(node=self.container.node)
dataset_management = DatasetManagementServiceClient()

encoder = IonObjectSerializer()

# Create streams and subscriptions for each stream named in driver.
self._stream_config = {}

stream_name = 'parsed'
param_dict_name = 'ctd_parsed_param_dict'
pd_id = dataset_management.read_parameter_dictionary_by_name(param_dict_name, id_only=True)
stream_def_id = pubsub_client.create_stream_definition(name=stream_name, parameter_dictionary_id=pd_id)
pd = pubsub_client.read_stream_definition(stream_def_id).parameter_dictionary
stream_def = pubsub_client.read_stream_definition(stream_def_id)
stream_def_dict = encoder.serialize(stream_def)
pd = stream_def.parameter_dictionary
stream_id, stream_route = pubsub_client.create_stream(name=stream_name,
exchange_point='science_data',
stream_definition_id=stream_def_id)
stream_config = dict(routing_key=stream_route.routing_key,
exchange_point=stream_route.exchange_point,
stream_id=stream_id,
stream_definition_ref=stream_def_id,
parameter_dictionary=pd)
parameter_dictionary=pd,
stream_def_dict=stream_def_dict)
self._stream_config[stream_name] = stream_config

stream_name = 'raw'
param_dict_name = 'ctd_raw_param_dict'
pd_id = dataset_management.read_parameter_dictionary_by_name(param_dict_name, id_only=True)
stream_def_id = pubsub_client.create_stream_definition(name=stream_name, parameter_dictionary_id=pd_id)
pd = pubsub_client.read_stream_definition(stream_def_id).parameter_dictionary
stream_def = pubsub_client.read_stream_definition(stream_def_id)
stream_def_dict = encoder.serialize(stream_def)
pd = stream_def.parameter_dictionary
stream_id, stream_route = pubsub_client.create_stream(name=stream_name,
exchange_point='science_data',
stream_definition_id=stream_def_id)
stream_config = dict(routing_key=stream_route.routing_key,
exchange_point=stream_route.exchange_point,
stream_id=stream_id,
stream_definition_ref=stream_def_id,
parameter_dictionary=pd)
parameter_dictionary=pd,
stream_def_dict=stream_def_dict)
self._stream_config[stream_name] = stream_config

def _start_data_subscribers(self, count, raw_count):
Expand Down

0 comments on commit 64608ff

Please sign in to comment.