Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

158 lines (115 sloc) 7.423 kb
from interface.services.icontainer_agent import ContainerAgentClient
#from pyon.ion.endpoint import ProcessRPCClient
from pyon.public import Container, log, IonObject
from pyon.util.int_test import IonIntegrationTestCase
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient
from ion.services.sa.instrument.instrument_management_service import InstrumentManagementService
from interface.services.sa.iinstrument_management_service import IInstrumentManagementService, InstrumentManagementServiceClient
from pyon.util.context import LocalContextMixin
from pyon.core.exception import BadRequest, NotFound, Conflict
from pyon.public import RT, PRED, LCS
from mock import Mock, patch
from pyon.util.unit_test import PyonTestCase
from nose.plugins.attrib import attr
import unittest
#from pyon.util.log import log
from ion.services.sa.test.helpers import any_old
@attr('INT', group='sa')
class TestInstrumentManagementServiceIntegration(IonIntegrationTestCase):
def setUp(self):
# Start container
#print 'instantiating container'
self._start_container()
#container = Container()
#print 'starting container'
#container.start()
#print 'started container'
self.container.start_rel_from_url('res/deploy/r2deploy.yml')
self.RR = ResourceRegistryServiceClient(node=self.container.node)
self.IMS = InstrumentManagementServiceClient(node=self.container.node)
print 'started services'
@unittest.skip('this test just for debugging setup')
def test_just_the_setup(self):
return
@unittest.skip('this test just for debugging setup')
def test_resources_associations(self):
"""
create one of each resource and association used by IMS
to guard against problems in ion-definitions
"""
#stuff we control
instrument_agent_instance_id, _ = self.RR.create(any_old(RT.InstrumentAgentInstance))
instrument_agent_id, _ = self.RR.create(any_old(RT.InstrumentAgent))
instrument_device_id, _ = self.RR.create(any_old(RT.InstrumentDevice))
instrument_model_id, _ = self.RR.create(any_old(RT.InstrumentModel))
platform_agent_instance_id, _ = self.RR.create(any_old(RT.PlatformAgentInstance))
platform_agent_id, _ = self.RR.create(any_old(RT.PlatformAgent))
platform_device_id, _ = self.RR.create(any_old(RT.PlatformDevice))
platform_model_id, _ = self.RR.create(any_old(RT.PlatformModel))
sensor_device_id, _ = self.RR.create(any_old(RT.SensorDevice))
sensor_model_id, _ = self.RR.create(any_old(RT.SensorModel))
#stuff we associate to
data_producer_id, _ = self.RR.create(any_old(RT.DataProducer))
instrument_agent_instance_id #is only a target
#instrument_agent
self.RR.create_association(instrument_agent_id, PRED.hasModel, instrument_model_id)
self.RR.create_association(instrument_agent_id, PRED.hasInstance, instrument_agent_instance_id)
#instrument_device
self.RR.create_association(instrument_device_id, PRED.hasModel, instrument_model_id)
self.RR.create_association(instrument_device_id, PRED.hasAgentInstance, instrument_agent_instance_id)
#self.RR.create_association(instrument_device_id, PRED.hasSensor, sensor_device_id)
self.RR.create_association(instrument_device_id, PRED.hasDataProducer, data_producer_id)
instrument_model_id #is only a target
platform_agent_instance_id #is only a target
#platform_agent
self.RR.create_association(platform_agent_id, PRED.hasModel, platform_model_id)
self.RR.create_association(platform_agent_id, PRED.hasInstance, platform_agent_instance_id)
#platform_device
self.RR.create_association(platform_device_id, PRED.hasModel, platform_model_id)
self.RR.create_association(platform_device_id, PRED.hasAgentInstance, platform_agent_instance_id)
self.RR.create_association(platform_device_id, PRED.hasInstrument, instrument_device_id)
platform_model_id #is only a target
#sensor_device
self.RR.create_association(sensor_device_id, PRED.hasModel, sensor_model_id)
sensor_model_id #is only a target
def test_get_extended_instrument_device(self):
#stuff we control
instrument_agent_instance_id, _ = self.RR.create(any_old(RT.InstrumentAgentInstance))
instrument_agent_id, _ = self.RR.create(any_old(RT.InstrumentAgent))
instrument_device_id, _ = self.RR.create(any_old(RT.InstrumentDevice))
instrument_model_id, _ = self.RR.create(any_old(RT.InstrumentModel))
platform_agent_instance_id, _ = self.RR.create(any_old(RT.PlatformAgentInstance))
platform_agent_id, _ = self.RR.create(any_old(RT.PlatformAgent))
platform_device_id, _ = self.RR.create(any_old(RT.PlatformDevice))
platform_model_id, _ = self.RR.create(any_old(RT.PlatformModel))
sensor_device_id, _ = self.RR.create(any_old(RT.SensorDevice))
sensor_model_id, _ = self.RR.create(any_old(RT.SensorModel))
#stuff we associate to
data_producer_id, _ = self.RR.create(any_old(RT.DataProducer))
instrument_agent_instance_id #is only a target
#instrument_agent
self.RR.create_association(instrument_agent_id, PRED.hasModel, instrument_model_id)
self.RR.create_association(instrument_agent_id, PRED.hasInstance, instrument_agent_instance_id)
#instrument_device
self.RR.create_association(instrument_device_id, PRED.hasModel, instrument_model_id)
self.RR.create_association(instrument_device_id, PRED.hasAgentInstance, instrument_agent_instance_id)
#self.RR.create_association(instrument_device_id, PRED.hasSensor, sensor_device_id)
self.RR.create_association(instrument_device_id, PRED.hasDataProducer, data_producer_id)
instrument_model_id #is only a target
platform_agent_instance_id #is only a target
#platform_agent
self.RR.create_association(platform_agent_id, PRED.hasModel, platform_model_id)
self.RR.create_association(platform_agent_id, PRED.hasInstance, platform_agent_instance_id)
#platform_device
self.RR.create_association(platform_device_id, PRED.hasModel, platform_model_id)
self.RR.create_association(platform_device_id, PRED.hasAgentInstance, platform_agent_instance_id)
self.RR.create_association(platform_device_id, PRED.hasInstrument, instrument_device_id)
platform_model_id #is only a target
#sensor_device
self.RR.create_association(sensor_device_id, PRED.hasModel, sensor_model_id)
#Used for testing of computed attributes for calculating associations
self.RR.create_association(instrument_device_id,PRED.hasSensor, sensor_device_id)
sensor_model_id #is only a target
extended_instrument = self.IMS.get_instrument_device_extension(instrument_device_id)
self.assertEqual(instrument_device_id,extended_instrument._id)
self.assertEqual(extended_instrument.computed.sensor_count,1)
Jump to Line
Something went wrong with that request. Please try again.