Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' of github.com:ooici/coi-services

  • Loading branch information...
commit 345b5d0e73dcf9e86929b99dde517492b43413ea 2 parents fd652ff + 9f28d74
@MauriceManning MauriceManning authored
2  extern/pyon
@@ -1 +1 @@
-Subproject commit f68ddb11f10ab2562a07f8bc212665d91d6e4df9
+Subproject commit 0871a5f2b100d7fba58b7f8ad37892878d9dbdc3
View
2  ion/agents/instrument/instrument_agent.py
@@ -168,7 +168,7 @@ def on_init(self):
# Construct stream publishers.
self._construct_data_publishers()
- if self._is_policy_enabled():
+ if self._is_governance_enabled():
self.container.governance_controller.register_process_operation_precondition(self, 'execute_resource', self.check_execute_resource)
self.container.governance_controller.register_process_operation_precondition(self, 'set_resource', self.check_set_resource)
self.container.governance_controller.register_process_operation_precondition(self, 'ping_resource', self.check_ping_resource)
View
2  ion/agents/platform/platform_agent.py
@@ -219,7 +219,7 @@ def on_init(self):
Init objects that depend on the container services and start state
machine.
"""
- if self._is_policy_enabled():
+ if self._is_governance_enabled():
self.container.governance_controller.register_process_operation_precondition(self, 'execute_resource', self.check_execute_resource)
self.container.governance_controller.register_process_operation_precondition(self, 'set_resource', self.check_set_resource)
self.container.governance_controller.register_process_operation_precondition(self, 'ping_resource', self.check_ping_resource)
View
20 ion/processes/bootstrap/load_system_policy.py
@@ -609,13 +609,13 @@ def op_load_system_policies(cls, calling_process):
<Actions>
<Action>
- <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-regexp-match">
+ <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
<AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">negotiate</AttributeValue>
<ActionAttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" DataType="http://www.w3.org/2001/XMLSchema#string"/>
</ActionMatch>
</Action>
<Action>
- <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-regexp-match">
+ <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
<AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">get_capabilities</AttributeValue>
<ActionAttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" DataType="http://www.w3.org/2001/XMLSchema#string"/>
</ActionMatch>
@@ -674,35 +674,41 @@ def op_load_system_policies(cls, calling_process):
<Actions>
<Action>
- <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-regexp-match">
+ <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
<AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">get_resource_state</AttributeValue>
<ActionAttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" DataType="http://www.w3.org/2001/XMLSchema#string"/>
</ActionMatch>
</Action>
<Action>
- <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-regexp-match">
+ <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
<AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">get_resource</AttributeValue>
<ActionAttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" DataType="http://www.w3.org/2001/XMLSchema#string"/>
</ActionMatch>
</Action>
<Action>
- <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-regexp-match">
+ <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
<AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">set_resource</AttributeValue>
<ActionAttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" DataType="http://www.w3.org/2001/XMLSchema#string"/>
</ActionMatch>
</Action>
<Action>
- <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-regexp-match">
+ <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
<AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">execute_resource</AttributeValue>
<ActionAttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" DataType="http://www.w3.org/2001/XMLSchema#string"/>
</ActionMatch>
</Action>
<Action>
- <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-regexp-match">
+ <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
<AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">ping_resource</AttributeValue>
<ActionAttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" DataType="http://www.w3.org/2001/XMLSchema#string"/>
</ActionMatch>
</Action>
+ <Action>
+ <ActionMatch MatchId="urn:oasis:names:tc:xacml:1.0:function:string-equal">
+ <AttributeValue DataType="http://www.w3.org/2001/XMLSchema#string">get_agent_state</AttributeValue>
+ <ActionAttributeDesignator AttributeId="urn:oasis:names:tc:xacml:1.0:action:action-id" DataType="http://www.w3.org/2001/XMLSchema#string"/>
+ </ActionMatch>
+ </Action>
</Actions>
<Subjects>
View
15 ion/processes/data/ingestion/science_granule_ingestion_worker.py
@@ -146,8 +146,19 @@ def add_granule(self,stream_id, granule):
#--------------------------------------------------------------------------------
# Actual persistence
#--------------------------------------------------------------------------------
- covcraft = CoverageCraft(coverage)
- covcraft.sync_with_granule(granule)
+ rdt = RecordDictionaryTool.load_from_granule(granule)
+ start_index = coverage.num_timesteps
+ elements = len(rdt)
+ if not elements:
+ return
+ coverage.insert_timesteps(elements)
+
+ for k,v in rdt.iteritems():
+ log.info('key: %s', k)
+ log.info('value: %s', v)
+ slice_ = slice(start_index, None)
+ coverage.set_parameter_values(param_name=k, tdoa=slice_, value=v)
+
DatasetManagementService._persist_coverage(dataset_id,coverage)
View
163 ion/processes/data/replay/replay_process.py
@@ -10,18 +10,24 @@
from pyon.core.object import IonObjectDeserializer
from pyon.core.bootstrap import get_obj_registry
from pyon.datastore.datastore import DataStore
+from pyon.util.arg_check import validate_is_instance
from pyon.util.log import log
from ion.services.dm.inventory.dataset_management_service import DatasetManagementService
from ion.services.dm.utility.granule import RecordDictionaryTool
-from ion.services.dm.utility.granule_utils import CoverageCraft
-from interface.services.dm.idataset_management_service import DatasetManagementServiceClient
+from interface.services.dm.idataset_management_service import DatasetManagementServiceProcessClient, DatasetManagementServiceClient
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient
from interface.services.dm.ireplay_process import BaseReplayProcess
from gevent.event import Event
+from numbers import Number
+import datetime
+import dateutil.parser
import gevent
+import netCDF4
+import numpy as np
+import time
class ReplayProcess(BaseReplayProcess):
@@ -43,6 +49,18 @@ class ReplayProcess(BaseReplayProcess):
'''
process_type = 'standalone'
publish_limit = 10
+ dataset_id = None
+ delivery_format = {}
+ start_time = None
+ end_time = None
+ stride_time = None
+ parameters = None
+ stream_id = ''
+ stream_def_id = ''
+
+
+
+
def __init__(self, *args, **kwargs):
super(ReplayProcess,self).__init__(*args,**kwargs)
@@ -57,7 +75,8 @@ def on_start(self):
'''
log.info('IVE BEEN STARTED!')
super(ReplayProcess,self).on_start()
- dsm_cli = DatasetManagementServiceClient()
+ dsm_cli = DatasetManagementServiceProcessClient(process=self)
+ pubsub = PubsubManagementServiceProcessClient(process=self)
self.dataset_id = self.CFG.get_safe('process.dataset_id', None)
self.delivery_format = self.CFG.get_safe('process.delivery_format',{})
@@ -66,6 +85,10 @@ def on_start(self):
self.stride_time = self.CFG.get_safe('process.query.stride_time', None)
self.parameters = self.CFG.get_safe('process.query.parameters',None)
self.publish_limit = self.CFG.get_safe('process.query.publish_limit', 10)
+ self.stream_id = self.CFG.get_safe('process.publish_streams.output', '')
+ self.stream_def = pubsub.read_stream_definition(stream_id=self.stream_id)
+ self.stream_def_id = self.stream_def._id
+
self.publishing.clear()
self.play.set()
self.end.clear()
@@ -76,7 +99,50 @@ def on_start(self):
self.dataset = dsm_cli.read_dataset(self.dataset_id)
self.pubsub = PubsubManagementServiceProcessClient(process=self)
-
+ @classmethod
+ def _coverage_to_granule(cls, coverage, start_time=None, end_time=None, stride_time=None, parameters=None, stream_def_id=None, tdoa=None):
+ slice_ = slice(None) # Defaults to all values
+
+ if tdoa is not None and isinstance(tdoa,slice):
+ slice_ = tdoa
+ elif stride_time is not None:
+ validate_is_instance(start_time, Number, 'start_time must be a number for striding.')
+ validate_is_instance(end_time, Number, 'end_time must be a number for striding.')
+ validate_is_instance(stride_time, Number, 'stride_time must be a number for striding.')
+ ugly_range = np.arange(start_time, end_time, stride_time)
+ idx_values = [cls.get_relative_time(coverage,i) for i in ugly_range]
+ slice_ = [idx_values]
+
+ elif not (start_time is None and end_time is None):
+ time_var = coverage._temporal_param_name
+ uom = coverage.get_parameter_context(time_var).uom
+ if start_time is not None:
+ start_units = cls.ts_to_units(uom,start_time)
+ log.info('Units: %s', start_units)
+ start_idx = cls.get_relative_time(coverage,start_units)
+ log.info('Start Index: %s', start_idx)
+ start_time = start_idx
+ if end_time is not None:
+ end_units = cls.ts_to_units(uom,end_time)
+ log.info('End units: %s', end_units)
+ end_idx = cls.get_relative_time(coverage,end_units)
+ log.info('End index: %s', end_idx)
+ end_time = end_idx
+ slice_ = slice(start_time,end_time,stride_time)
+ log.info('Slice: %s', slice_)
+
+ if stream_def_id:
+ rdt = RecordDictionaryTool(stream_definition_id=stream_def_id)
+ else:
+ rdt = RecordDictionaryTool(param_dictionary=coverage.parameter_dictionary)
+ if parameters is not None:
+ fields = list(set(parameters).intersection(rdt.fields))
+ else:
+ fields = rdt.fields
+
+ for field in fields:
+ rdt[field] = coverage.get_parameter_values(field, tdoa=slice_)
+ return rdt
def execute_retrieve(self):
'''
@@ -84,11 +150,10 @@ def execute_retrieve(self):
as a value in lieu of publishing it on a stream
'''
coverage = DatasetManagementService._get_coverage(self.dataset_id)
- crafter = CoverageCraft(coverage)
- #@todo: add bounds checking to ensure the dataset being retrieved is not too large
- crafter.sync_rdt_with_coverage(start_time=self.start_time, end_time=self.end_time, stride_time=self.stride_time, parameters=self.parameters)
- granule = crafter.to_granule()
- return granule
+ rdt = self._coverage_to_granule(coverage,self.start_time, self.end_time, self.stride_time, self.parameters)
+ return rdt.to_granule()
+
+
def execute_replay(self):
'''
@@ -151,38 +216,86 @@ def get_last_granule(cls, container, dataset_id):
ts = float(doc.get('ts_create',0))
coverage = DatasetManagementService._get_coverage(dataset_id)
-
- black_box = CoverageCraft(coverage)
- black_box.sync_rdt_with_coverage(start_time=ts,end_time=None)
- granule = black_box.to_granule()
+ rdt = cls._coverage_to_granule(coverage,start_time=ts, end_time=None)
+ return rdt.to_granule()
- return granule
@classmethod
def get_last_values(cls, dataset_id):
coverage = DatasetManagementService._get_coverage(dataset_id)
+ rdt = cls._coverage_to_granule(coverage,tdoa=slice(-1,None))
- black_box = CoverageCraft(coverage)
- black_box.sync_rdt_with_coverage(tdoa=slice(-1,None))
- granule = black_box.to_granule()
- return granule
+ return rdt.to_granule()
def _replay(self):
coverage = DatasetManagementService._get_coverage(self.dataset_id)
- crafter = CoverageCraft(coverage)
- crafter.sync_rdt_with_coverage(start_time=self.start_time, end_time=self.end_time, stride_time=self.stride_time, parameters=self.parameters)
+ rdt = self._coverage_to_granule(coverage, self.start_time, self.end_time, self.stride_time, self.parameters, self.stream_def_id)
- elements = len(crafter.rdt)
- stream_id = self.output.stream_id
- stream_def = self.pubsub.read_stream_definition(stream_id=stream_id)
+
+ elements = len(rdt)
+
for i in xrange(elements / self.publish_limit):
- outgoing = RecordDictionaryTool(stream_definition_id=stream_def._id)
+ outgoing = RecordDictionaryTool(stream_definition_id=self.stream_def_id)
fields = self.parameters or outgoing.fields
for field in fields:
- outgoing[field] = crafter.rdt[field][(i*self.publish_limit) : ((i+1)*self.publish_limit)]
+ outgoing[field] = rdt[field][(i*self.publish_limit) : ((i+1)*self.publish_limit)]
yield outgoing
return
+ @classmethod
+ def get_relative_time(cls, coverage, time):
+ '''
+ Determines the relative time in the coverage model based on a given time
+ The time must match the coverage's time units
+ '''
+ time_name = coverage._temporal_param_name
+ pc = coverage.get_parameter_context(time_name)
+ units = pc.uom
+ if 'iso' in units:
+ return None # Not sure how to implement this.... How do you compare iso strings effectively?
+ values = coverage.get_parameter_values(time_name)
+ return cls.find_nearest(values,time)
+
+ @classmethod
+ def ts_to_units(cls,units, val):
+ '''
+ Converts a unix timestamp into various formats
+ Example:
+ ts = time.time()
+ CoverageCraft.ts_to_units('days since 2000-01-01', ts)
+ '''
+ if 'iso' in units:
+ return time.strftime('%Y-%d-%mT%H:%M:%S', time.gmtime(val))
+ elif 'since' in units:
+ t = netCDF4.netcdftime.utime(units)
+ return t.date2num(datetime.datetime.utcfromtimestamp(val))
+ else:
+ return val
+
+ @classmethod
+ def units_to_ts(cls, units, val):
+ '''
+ Converts known time formats into a unix timestamp
+ Example:
+ ts = CoverageCraft.units_to_ts('days since 2000-01-01', 1200)
+ '''
+ if 'since' in units:
+ t = netCDF4.netcdftime.utime(units)
+ dtg = t.num2date(val)
+ return time.mktime(dtg.timetuple())
+ elif 'iso' in units:
+ t = dateutil.parser.parse(val)
+ return time.mktime(t.timetuple())
+ else:
+ return val
+
+ @classmethod
+ def find_nearest(cls, arr, val):
+ '''
+ The sexiest algorithm for finding the best matching value for a numpy array
+ '''
+ idx = np.abs(arr-val).argmin()
+ return idx
View
6 ion/processes/data/transforms/ctd/test/test_ctd_transforms.py
@@ -6,7 +6,7 @@
'''
-
+import os
from pyon.ion.stream import StandaloneStreamPublisher
from pyon.public import log
from pyon.util.containers import DotDict
@@ -108,6 +108,8 @@ def test_transforms(self):
L2_dens = self.tx_L2_D.execute(packet)
log.debug("L2 dens: %s" % L2_dens)
+@attr('LOCOINT')
+@unittest.skipIf(os.getenv('CEI_LAUNCH_TEST', False), 'Skip test while in CEI LAUNCH mode')
@attr('INT', group='dm')
class CtdTransformsIntTest(IonIntegrationTestCase):
def setUp(self):
@@ -811,4 +813,4 @@ def _get_new_ctd_packet(self, parameter_dictionary, length):
g = rdt.to_granule()
- return g
+ return g
View
18 ion/services/coi/test/test_governance.py
@@ -1125,6 +1125,12 @@ def test_instrument_agent_policy(self):
retval = ia_client.get_resource(params, headers=user_header)
self.assertIn('(get_resource) has been denied',cm.exception.message)
+ #This agent operation should not be allowed for a user that is not an Instrument Operator
+ with self.assertRaises(Unauthorized) as cm:
+ retval = ia_client.get_agent_state(headers=user_header)
+ self.assertEqual(retval, ResourceAgentState.UNINITIALIZED)
+ self.assertIn('(get_agent_state) has been denied',cm.exception.message)
+
#Grant the role of Instrument Operator to the user
self.org_client.grant_role(org2_id,user_id, INSTRUMENT_OPERATOR_ROLE, headers=self.sa_user_header)
@@ -1141,17 +1147,11 @@ def test_instrument_agent_policy(self):
retval = ia_client.get_resource(params, headers=user_header)
- #This agent operation should not be allowed for a user that is an Instrument Operator
- with self.assertRaises(Unauthorized) as cm:
- retval = ia_client.get_agent_state(headers=user_header)
- self.assertEqual(retval, ResourceAgentState.UNINITIALIZED)
- self.assertIn('(get_agent_state) has been denied',cm.exception.message)
-
-
- #This agent operation should be allowed for a the ION System Actor
- retval = ia_client.get_agent_state(headers=self.sa_user_header)
+ #This agent operation should now be allowed for a user that is an Instrument Operator
+ retval = ia_client.get_agent_state(headers=user_header)
self.assertEqual(retval, ResourceAgentState.UNINITIALIZED)
+
#The execute commnand should fail if the user has not acquired the resource
with self.assertRaises(Unauthorized) as cm:
cmd = AgentCommand(command=SBE37ProtocolEvent.ACQUIRE_SAMPLE)
View
29 ion/services/dm/inventory/data_retriever_service.py
@@ -80,7 +80,7 @@ def start_replay_agent(self, replay_id=''):
config = replay.config
pid = replay.process_id
-
+
self.clients.process_dispatcher.schedule_process(process_definition_id=process_definition_id, process_id=pid, configuration=config)
@@ -98,10 +98,14 @@ def cancel_replay_agent(self, replay_id=''):
def retrieve(self, dataset_id='', query=None, delivery_format=None, module='', cls='', kwargs=None):
'''
- Query can have the following parameters:
- start_time: Beginning time value
- end_time: Ending time value
- stride_time: The stride time
+ Retrieves a dataset.
+ @param dataset_id Dataset identifier
+ @param query Query parameters (start_time, end_time, stride_time, parameters)
+ @param delivery_format The stream definition identifier for the outgoing granule (stream_defintinition_id)
+ @param module Module to chain a transform into
+ @param cls Class of the transform
+ @param kwargs Keyword Arguments to pass into the transform.
+
'''
if query is None:
query = {}
@@ -114,13 +118,14 @@ def retrieve(self, dataset_id='', query=None, delivery_format=None, module='', c
replay_instance = ReplayProcess()
- replay_instance.dataset = self.clients.dataset_management.read_dataset(dataset_id)
- replay_instance.dataset_id = dataset_id
- replay_instance.start_time = query.get('start_time', None)
- replay_instance.end_time = query.get('end_time', None)
- replay_instance.stride_time = query.get('stride_time', None)
- replay_instance.parameters = query.get('parameters',None)
- replay_instance.container = self.container
+ replay_instance.dataset = self.clients.dataset_management.read_dataset(dataset_id)
+ replay_instance.dataset_id = dataset_id
+ replay_instance.start_time = query.get('start_time', None)
+ replay_instance.end_time = query.get('end_time', None)
+ replay_instance.stride_time = query.get('stride_time', None)
+ replay_instance.parameters = query.get('parameters',None)
+ replay_instance.stream_def_id = delivery_format
+ replay_instance.container = self.container
retrieve_data = replay_instance.execute_retrieve()
View
28 ion/services/dm/utility/granule/record_dictionary.py
@@ -9,15 +9,21 @@
'''
from pyon.core.exception import BadRequest
-from coverage_model.parameter import ParameterDictionary
-from coverage_model.parameter_values import get_value_class, AbstractParameterValue
-from coverage_model.coverage import SimpleDomainSet
-from pyon.util.log import log
+from pyon.core.object import IonObjectSerializer
+from pyon.core.interceptor.encode import encode_ion
from pyon.util.arg_check import validate_equal
+from pyon.util.log import log
+from pyon.util.memoize import memoize_lru
+
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient
from interface.objects import Granule
-from pyon.util.memoize import memoize_lru
+
+from coverage_model.parameter import ParameterDictionary
+from coverage_model.parameter_values import get_value_class, AbstractParameterValue
+from coverage_model.coverage import SimpleDomainSet
+
import numpy as np
+import msgpack
class RecordDictionaryTool(object):
"""
@@ -238,6 +244,18 @@ def __eq__(self, comp):
def __ne__(self, comp):
return not (self == comp)
+
+ def size(self):
+ '''
+ Truly poor way to calculate the size of a granule...
+ returns the size in bytes.
+ '''
+ granule = self.to_granule()
+ serializer = IonObjectSerializer()
+ flat = serializer.serialize(granule)
+ byte_stream = msgpack.packb(flat, default=encode_ion)
+ return len(byte_stream)
+
@staticmethod
@memoize_lru(maxsize=100)
View
1  ion/services/dm/utility/granule_utils.py
@@ -279,3 +279,4 @@ def time_series_domain():
tdom = GridDomain(GridShape('temporal', [0]), tcrs, MutabilityEnum.EXTENSIBLE)
sdom = GridDomain(GridShape('spatial', [0]), scrs, MutabilityEnum.IMMUTABLE) # Dimensionality is excluded for now
return tdom, sdom
+
View
14 ion/services/sa/tcaa/remote_endpoint.py
@@ -289,9 +289,23 @@ def _result_complete(self, result):
def get_port(self):
"""
+ Get the remote server port number.
"""
return self._this_port
+ def set_client_port(self, port):
+ """
+ Set the remote client port number.
+ """
+ self._other_port = port
+
+ def get_client_port(self):
+ """
+ Get the remote client port number.
+ """
+ return self._other_port
+
+
class RemoteEndpointClient(RemoteEndpointProcessClient):
"""
Remote endpoint client.
View
12 ion/services/sa/tcaa/terrestrial_endpoint.py
@@ -284,6 +284,18 @@ def get_port(self):
"""
return self._this_port
+ def set_client_port(self, port):
+ """
+ Set the terrestrial client port.
+ """
+ self._other_port = port
+
+ def get_client_port(self):
+ """
+ Get the terrestrial client port.
+ """
+ return self._other_port
+
class TerrestrialEndpointClient(TerrestrialEndpointProcessClient):
"""
View
298 ion/services/sa/tcaa/test/test_2caa.py
@@ -96,9 +96,301 @@ class Test2CAA(IonIntegrationTestCase):
def setUp(self):
"""
"""
- pass
+
+ ###################################################################
+ # Internal parameters and container.
+ ###################################################################
+
+ # Internal parameters.
+ self._terrestrial_platform_id = 'terrestrial_id'
+ self._remote_platform_id = 'remote_id'
+ self._resource_id = 'fake_id'
+ self._remote_port = 0
+ self._terrestrial_port = 0
+ self._te_client = None
+ self._re_client = None
+ self._remote_pid = None
+ self._terrestrial_pid = None
+
+ self._done_telem_evt = AsyncResult()
+ self._done_queue_mod_evt = AsyncResult()
+ self._done_cmd_tx_evt = AsyncResult()
+ self._done_cmd_evnt = AsyncResult()
+
+ # Start container.
+ log.debug('Staring capability container.')
+ self._start_container()
+
+ # Bring up services in a deploy file (no need to message).
+ log.info('Staring deploy services.')
+ self.container.start_rel_from_url('res/deploy/r2deploy.yml')
+
+ # Create a container client.
+ log.debug('Creating container client.')
+ container_client = ContainerAgentClient(node=self.container.node,
+ name=self.container.name)
+
+ ###################################################################
+ # Terrestrial endpoint.
+ ###################################################################
+
+ # Create the remote name.
+ xs_name = 'remote1'
+ terrestrial_svc_name = 'terrestrial_endpoint'
+ terrestrial_listen_name = terrestrial_svc_name + xs_name
+
+ # Create terrestrial config.
+ terrestrial_endpoint_config = {
+ 'other_host' : 'localhost',
+ 'other_port' : self._remote_port,
+ 'this_port' : 0,
+ 'platform_resource_id' : self._terrestrial_platform_id,
+ 'process' : {
+ 'listen_name' : terrestrial_listen_name
+ }
+ }
+
+ # Spawn the terrestrial enpoint process.
+ log.debug('Spawning terrestrial endpoint process.')
+ self._terrestrial_pid = container_client.spawn_process(
+ name=terrestrial_listen_name,
+ module='ion.services.sa.tcaa.terrestrial_endpoint',
+ cls='TerrestrialEndpoint',
+ config=terrestrial_endpoint_config)
+ log.debug('Terrestrial endpoint pid=%s.', str(self._terrestrial_pid))
+
+ # Create a terrestrial client.
+ self.te_client = TerrestrialEndpointClient(
+ process=FakeProcess(),
+ to_name=terrestrial_listen_name)
+ log.debug('Got te client %s.', str(self.te_client))
+ self._terrestrial_port = self.te_client.get_port()
+ log.debug('Terrestrial port is: %i', self._terrestrial_port)
+
+ ###################################################################
+ # Remote endpoint.
+ ###################################################################
+
+ remote_svc_name = 'remote_endpoint'
+ remote_listen_name = remote_svc_name + xs_name
+
+ # Create agent config.
+ remote_endpoint_config = {
+ 'other_host' : 'localhost',
+ 'other_port' : self._remote_port,
+ 'this_port' : 0,
+ 'platform_resource_id' : self._remote_platform_id,
+ 'process' : {
+ 'listen_name' : remote_listen_name
+ }
+ }
+
+ # Spawn the remote enpoint process.
+ log.debug('Spawning remote endpoint process.')
+ self._remote_pid = container_client.spawn_process(
+ name=remote_listen_name,
+ module='ion.services.sa.tcaa.remote_endpoint',
+ cls='RemoteEndpoint',
+ config=remote_endpoint_config)
+ log.debug('Remote endpoint pid=%s.', str(self._remote_pid))
+
+ # Create an endpoint client.
+ self.re_client = RemoteEndpointClient(
+ process=FakeProcess(),
+ to_name=remote_listen_name)
+ log.debug('Got re client %s.', str(self.re_client))
+
+ # Remember the remote port.
+ self._remote_port = self.re_client.get_port()
+ log.debug('The remote port is: %i.', self._remote_port)
+
+ ###################################################################
+ # Assign client ports.
+ # This is primarily for test purposes as the IP config in
+ # deployment will be fixed in advance.
+ ###################################################################
+
+ self.te_client.set_client_port(self._remote_port)
+ check_port = self.te_client.get_client_port()
+ log.debug('Terrestrial client port is: %i', check_port)
+
+ self.re_client.set_client_port(self._terrestrial_port)
+ check_port = self.re_client.get_client_port()
+ log.debug('Remote client port is: %i', check_port)
+
+ ###################################################################
+ # Start the event publisher and subscribers.
+ # Used to send fake agent telemetry publications to the endpoints,
+ # and to receive endpoint publications.
+ ###################################################################
+ self._event_publisher = EventPublisher()
+
+ # Start the event subscriber.
+ self._event_subscriber = EventSubscriber(
+ event_type='PlatformEvent',
+ callback=self.consume_event,
+ origin=self._terrestrial_platform_id)
+ self._event_subscriber.start()
+ self._event_subscriber._ready_event.wait(timeout=CFG.endpoint.receive.timeout)
+ self.addCleanup(self._event_subscriber.stop)
+
+ # Start the result subscriber.
+ self._result_subscriber = EventSubscriber(
+ event_type='RemoteCommandResult',
+ origin=self._resource_id,
+ callback=self.consume_event)
+ self._result_subscriber.start()
+ self._result_subscriber._ready_event.wait(timeout=CFG.endpoint.receive.timeout)
+ self.addCleanup(self._result_subscriber.stop)
+
+ ###################################################################
+ # Agent startup.
+ ###################################################################
+
+ def start_agent(self):
+ """
+ Start an instrument agent and client.
+ """
+
+ log.info('Creating driver integration test support:')
+ log.info('driver module: %s', DRV_MOD)
+ log.info('driver class: %s', DRV_CLS)
+ log.info('device address: %s', DEV_ADDR)
+ log.info('device port: %s', DEV_PORT)
+ log.info('log delimiter: %s', DELIM)
+ log.info('work dir: %s', WORK_DIR)
+ self._support = DriverIntegrationTestSupport(DRV_MOD,
+ DRV_CLS,
+ DEV_ADDR,
+ DEV_PORT,
+ DATA_PORT,
+ CMD_PORT,
+ PA_BINARY,
+ DELIM,
+ WORK_DIR)
+
+ # Start port agent, add stop to cleanup.
+ port = self._support.start_pagent()
+ log.info('Port agent started at port %i',port)
+
+ # Configure driver to use port agent port number.
+ DVR_CONFIG['comms_config'] = {
+ 'addr' : 'localhost',
+ 'port' : port
+ }
+ self.addCleanup(self._support.stop_pagent)
+
+ # Create agent config.
+ agent_config = {
+ 'driver_config' : DVR_CONFIG,
+ 'stream_config' : {},
+ 'agent' : {'resource_id': IA_RESOURCE_ID},
+ 'test_mode' : True
+ }
+
+ # Start instrument agent.
+ log.debug("Starting IA.")
+ container_client = ContainerAgentClient(node=self.container.node,
+ name=self.container.name)
+
+ ia_pid = container_client.spawn_process(name=IA_NAME,
+ module=IA_MOD,
+ cls=IA_CLS,
+ config=agent_config)
+
+ log.info('Agent pid=%s.', str(ia_pid))
- def test_xxx(sefl):
+ # Start a resource agent client to talk with the instrument agent.
+
+ self._ia_client = ResourceAgentClient(IA_RESOURCE_ID, process=FakeProcess())
+ log.info('Got ia client %s.', str(self._ia_client))
+
+ ###################################################################
+ # Telemetry publications to start/top endpoint.
+ # (Normally be published by appropriate platform agents.)
+ ###################################################################
+
+ def terrestrial_link_up(self):
+ """
+ Publish telemetry available to the terrestrial endpoint.
"""
+ # Publish a link up event to be caught by the terrestrial endpoint.
+ log.debug('Publishing terrestrial telemetry available event.')
+ self._event_publisher.publish_event(
+ event_type='PlatformTelemetryEvent',
+ origin=self._terrestrial_platform_id,
+ status = TelemetryStatusType.AVAILABLE)
+
+ def terrestrial_link_down(self):
+ """
+ Publish telemetry unavailable to the terrestrial endpoint.
+ """
+ # Publish a link up event to be caught by the terrestrial endpoint.
+ log.debug('Publishing terrestrial telemetry unavailable event.')
+ self._event_publisher.publish_event(
+ event_type='PlatformTelemetryEvent',
+ origin=self._terrestrial_platform_id,
+ status = TelemetryStatusType.UNAVAILABLE)
+
+ def remote_link_up(self):
+ """
+ Publish telemetry available to the remote endpoint.
+ """
+ # Publish a link up event to be caught by the remote endpoint.
+ log.debug('Publishing remote telemetry available event.')
+ self._event_publisher.publish_event(
+ event_type='PlatformTelemetryEvent',
+ origin=self._remote_platform_id,
+ status = TelemetryStatusType.AVAILABLE)
+
+ def remote_link_down(self):
"""
- pass
+ Publish telemetry unavailable to the remote endpoint.
+ """
+ # Publish a link down event to be caught by the remote endpoint.
+ log.debug('Publishing remote telemetry unavailable event.')
+ self._event_publisher.publish_event(
+ event_type='PlatformTelemetryEvent',
+ origin=self._remote_platform_id,
+ status = TelemetryStatusType.UNAVAILABLE)
+
+ def consume_event(self, evt, *args, **kwargs):
+ """
+ Test callback for events.
+ """
+ log.debug('Test got event: %s, args: %s, kwargs: %s',
+ str(evt), str(args), str(kwargs))
+
+ """
+ if evt.type_ == 'PublicPlatformTelemetryEvent':
+ self._telem_evts.append(evt)
+ if self._no_telem_evts > 0 and self._no_telem_evts == len(self._telem_evts):
+ self._done_telem_evt.set()
+
+ elif evt.type_ == 'RemoteQueueModifiedEvent':
+ self._queue_mod_evts.append(evt)
+ if self._no_queue_mod_evts > 0 and self._no_queue_mod_evts == len(self._queue_mod_evts):
+ self._done_queue_mod_evt.set()
+
+ elif evt.type_ == 'RemoteCommandTransmittedEvent':
+ self._cmd_tx_evts.append(evt)
+ if self._no_cmd_tx_evts > 0 and self._no_cmd_tx_evts == len(self._cmd_tx_evts):
+ self._done_cmd_tx_evt.set()
+
+ elif evt.type_ == 'RemoteCommandResult':
+ cmd = evt.command
+ self._results_recv[cmd.command_id] = cmd
+ if len(self._results_recv) == self._no_requests:
+ self._done_cmd_evt.set()
+ """
+
+ def test_xxx(self):
+ """
+ """
+
+ gevent.sleep(2)
+ self.terrestrial_link_up()
+ gevent.sleep(2)
+ self.terrestrial_link_down()
+
+
View
1  ion/services/sa/tcaa/test/test_terrestrial_endpoint.py
@@ -108,6 +108,7 @@ def setUp(self):
self._done_queue_mod_evts = AsyncResult()
self._done_telem_evts = AsyncResult()
self._done_cmd_tx_evts = AsyncResult()
+
# Start container.
log.debug('Staring capability container.')
self._start_container()
Please sign in to comment.
Something went wrong with that request. Please try again.