Skip to content

Commit

Permalink
Allow old RDT config for now.
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardhunter2 committed May 28, 2013
1 parent 27e685a commit 688c2df
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
24 changes: 15 additions & 9 deletions ion/agents/agent_stream_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ def _construct_streams(self, stream_info):
decoder = IonObjectDeserializer(obj_registry=get_obj_registry())
for (stream_name, config) in stream_info.iteritems():
try:
#stream_def = config['stream_definition_ref']
stream_def_dict = config['stream_def_dict']
stream_def_obj = decoder.deserialize(stream_def_dict)
#rdt = RecordDictionaryTool(stream_definition_id=stream_def)
rdt = RecordDictionaryTool(stream_definition=stream_def_obj)
if config.has_key('stream_def_dict'):
stream_def = config['stream_definition_ref']
self._stream_defs[stream_name] = stream_def
rdt = RecordDictionaryTool(stream_definition_id=stream_def)
else:
stream_def_dict = config['stream_def_dict']
stream_def_obj = decoder.deserialize(stream_def_dict)
self._stream_defs[stream_name] = stream_def_obj
rdt = RecordDictionaryTool(stream_definition=stream_def_obj)
self._agent.aparam_streams[stream_name] = rdt.fields
self._agent.aparam_pubrate[stream_name] = 0
except Exception as e:
Expand All @@ -78,9 +82,7 @@ def _construct_streams(self, stream_info):

def _construct_publishers(self, stream_info):
for (stream_name, stream_config) in stream_info.iteritems():
try:
stream_def = stream_config['stream_definition_ref']
self._stream_defs[stream_name] = stream_def
try:
exchange_point = stream_config['exchange_point']
routing_key = stream_config['routing_key']
route = StreamRoute(exchange_point=exchange_point,
Expand Down Expand Up @@ -169,7 +171,11 @@ def _publish_stream_buffer(self, stream_name):
return

stream_def = self._stream_defs[stream_name]
rdt = RecordDictionaryTool(stream_definition_id=stream_def)
if isinstance(stream_def, str):
rdt = RecordDictionaryTool(stream_definition_id=stream_def)
else:
rdt = RecordDictionaryTool(stream_definition=stream_def)

publisher = self._publishers[stream_name]

vals = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def start_instrument_agent_process(container, stream_config={}, resource_id=IA_R
return ia_client

@attr('HARDWARE', group='sa')
@patch.dict(CFG, {'endpoint':{'receive':{'timeout': 120}}})
@patch.dict(CFG, {'endpoint':{'receive':{'timeout': 300}}})
class TestAgentConnectionFailures(IonIntegrationTestCase):
"""
Test cases for instrument agent class. Functions in this class provide
Expand Down

0 comments on commit 688c2df

Please sign in to comment.