Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Basic remote endpoint service and tests. Endpoint mixin class.

  • Loading branch information...
commit 0660cf8a9b89c22ac1975c2aac6118e1a810cb72 1 parent a781c32
@edwardhunter2 edwardhunter2 authored
View
129 ion/services/sa/tcaa/endpoint_mixin.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+"""
+@package ion.services.sa.tcaa.endpoint_mixin
+@file ion/services/sa/tcaa/endpoint_mixin.py
+@author Edward Hunter
+@brief 2CAA Terrestrial endpoint mixin.
+"""
+
+__author__ = 'Edward Hunter'
+__license__ = 'Apache 2.0'
+
+# Pyon log and config objects.
+from pyon.public import log
+from pyon.public import CFG
+
+import uuid
+import time
+
+# Pyon exceptions.
+from pyon.core.exception import BadRequest
+from pyon.core.exception import Conflict
+
+from pyon.event.event import EventPublisher, EventSubscriber
+from interface.objects import TelemetryStatusType, RemoteCommand
+
+from ion.services.sa.tcaa.r3pc import R3PCServer
+from ion.services.sa.tcaa.r3pc import R3PCClient
+
+
+
+class EndpointMixin(object):
+ """
+ """
+
+ def __init__(self):
+ """
+ """
+ pass
+
+ def mixin_on_init(self):
+ """
+ """
+ self._server = None
+ self._client = None
+ self._other_host = self.CFG.other_host
+ self._other_port = self.CFG.other_port
+ self._this_port = self.CFG.this_port
+ self._platform_resource_id = self.CFG.platform_resource_id
+ self._link_status = TelemetryStatusType.UNAVAILABLE
+ self._event_subscriber = None
+ self._server_greenlet = None
+ self._publisher = EventPublisher()
+
+ def mixin_on_start(self):
+ """
+ """
+ self._server = R3PCServer(self._req_callback,
+ self._server_close_callback)
+ self._client = R3PCClient(self._ack_callback,
+ self._client_close_callback)
+ if self._this_port == 0:
+ self._this_port = self._server.start('*', self._this_port)
+ else:
+ self._server.start('*', self._this_port)
+ log.debug('%s server binding to *:%i', self.__class__.__name__,
+ self._this_port)
+
+ # Start the event subscriber.
+ self._event_subscriber = EventSubscriber(
+ event_type='PlatformTelemetryEvent',
+ callback=self._consume_telemetry_event,
+ origin=self._platform_resource_id)
+ self._event_subscriber.start()
+ self._event_subscriber._ready_event.wait(timeout=5)
+
+ def mixin_on_quit(self):
+ """
+ """
+ self._stop()
+
+ def mixin_on_stop(self):
+ """
+ """
+ self._stop()
+
+ ######################################################################
+ # Helpers.
+ ######################################################################
+
+ def _on_link_up(self):
+ """
+ Processing on link up event.
+ Start client socket.
+ ION link availability published when pending commands are transmitted.
+ """
+ log.debug('%s client connecting to %s:%i',
+ self.__class__.__name__,
+ self._other_host, self._other_port)
+ self._client.start(self._other_host, self._other_port)
+ self._publisher.publish_event(
+ event_type='PublicPlatformTelemetryEvent',
+ origin=self._platform_resource_id,
+ status=TelemetryStatusType.AVAILABLE)
+
+ def _on_link_down(self):
+ """
+ Processing on link down event.
+ Stop client socket and publish ION link unavailability.
+ """
+ self._client.stop()
+ self._publisher.publish_event(
+ event_type='PublicPlatformTelemetryEvent',
+ origin=self._platform_resource_id,
+ status=TelemetryStatusType.UNAVAILABLE)
+
+ def _stop(self):
+ """
+ Stop sockets and subscriber.
+ """
+ if self._event_subscriber:
+ self._event_subscriber.stop()
+ self._event_subscriber = None
+ if self._server:
+ self._server.stop()
+ self._server = None
+ if self._client:
+ self._client.stop()
+ self._client = None
+
View
95 ion/services/sa/tcaa/remote_endpoint.py
@@ -25,17 +25,16 @@
from interface.services.sa.iremote_endpoint import BaseRemoteEndpoint
from interface.services.sa.iremote_endpoint import RemoteEndpointProcessClient
-from ion.services.sa.tcaa.r3pc import R3PCServer
-from ion.services.sa.tcaa.r3pc import R3PCClient
+from ion.services.sa.tcaa.endpoint_mixin import EndpointMixin
-class RemoteEndpoint(BaseRemoteEndpoint):
+class RemoteEndpoint(BaseRemoteEndpoint, EndpointMixin):
"""
"""
def __init__(self, *args, **kwargs):
"""
For framework level code only.
"""
- super(BaseRemoteEndpoint, self).__init__(*args, **kwargs)
+ super(RemoteEndpoint, self).__init__(*args, **kwargs)
######################################################################
# Framework process lifecycle funcitons.
@@ -46,46 +45,29 @@ def on_init(self):
Application level initializer.
Setup default internal values.
"""
- super(BaseRemoteEndpoint, self).on_init()
- self._server = None
- self._client = None
- self._terrestrial_host = self.CFG.terrestrial_host
- self._terrestrial_port = self.CFG.terrestrial_port
- self._remote_port = 0
- self._platform_resource_id = self.CFG.platform_resource_id
- self._remote_port = self.CFG.remote_port
- self._link_status = TelemetryStatusType.UNAVAILABLE
- self._event_subscriber = None
- self._server_greenlet = None
- self._publisher = EventPublisher()
+ super(RemoteEndpoint, self).on_init()
+ self.mixin_on_init()
def on_start(self):
"""
Process about to be started.
"""
- super(BaseRemoteEndpoint, self).on_start()
-
- # Start the event subscriber.
- self._event_subscriber = EventSubscriber(
- event_type='PlatformTelemetryEvent',
- callback=self._consume_telemetry_event,
- origin=self._platform_resource_id)
- self._event_subscriber.start()
- self._event_subscriber._ready_event.wait(timeout=5)
+ super(RemoteEndpoint, self).on_start()
+ self.mixin_on_start()
def on_stop(self):
"""
Process about to be stopped.
"""
- self._stop()
- super(BaseRemoteEndpoint, self).on_stop()
+ self.mixin_on_stop()
+ super(RemoteEndpoint, self).on_stop()
def on_quit(self):
"""
Process terminated following.
"""
- self._stop()
- super(BaseRemoteEndpoint, self).on_quit()
+ self.mixin_on_quit()
+ super(RemoteEndpoint, self).on_quit()
######################################################################
# Callbacks.
@@ -114,26 +96,17 @@ def _client_close_callback(self):
def _consume_telemetry_event(self, *args, **kwargs):
"""
"""
- print '##########################################'
- print "GOT A TELEMETRY EVENT: args:%s kwargs:%s" % (str(args), str(kwargs))
-
- ######################################################################
- # Helpers.
- ######################################################################
-
- def _stop(self):
- """
- Stop sockets and subscriber.
- """
- if self._event_subscriber:
- self._event_subscriber.stop()
- self._event_subscriber = None
- if self._server:
- self._server.stop()
- self._server = None
- if self._client:
- self._client.stop()
- self._client = None
+ log.debug('Telemetry event received by remote endpoint, args: %s, kwargs: %s',
+ str(args), str(kwargs))
+ evt = args[0]
+ self._link_status = evt.status
+ if evt.status == TelemetryStatusType.AVAILABLE:
+ log.debug('Remote endpoint telemetry available.')
+ self._on_link_up()
+
+ elif evt.status == TelemetryStatusType.UNAVAILABLE:
+ log.debug('Remote endpoint telemetry not available.')
+ self._on_link_down()
######################################################################
# Commands.
@@ -142,7 +115,7 @@ def _stop(self):
def get_port(self):
"""
"""
- return self._remote_port
+ return self._this_port
class RemoteEndpointClient(RemoteEndpointProcessClient):
"""
@@ -150,25 +123,3 @@ class RemoteEndpointClient(RemoteEndpointProcessClient):
"""
pass
-"""
-from pyon.core.bootstrap import get_service_registry
-svc_client_cls = get_service_registry().get_service_by_name(svc_name).client
-"""
-
-"""
-list procs from self.container.proc_manager.list_procs():
-EventPersister(name=event_persister,id=Edwards-MacBook-Pro_local_12469.1,type=standalone)
-ResourceRegistryService(name=resource_registry,id=Edwards-MacBook-Pro_local_12469.3,type=service)
-IdentityManagementService(name=identity_management,id=Edwards-MacBook-Pro_local_12469.5,type=service)
-DirectoryService(name=directory,id=Edwards-MacBook-Pro_local_12469.4,type=service)
-ExchangeManagementService(name=exchange_management,id=Edwards-MacBook-Pro_local_12469.7,type=service)
-
-procs by name (self.container.proc_manager.procs_by_name):
-data_retriever == DataRetrieverService(name=data_retriever,id=Edwards-MacBook-Pro_local_12469.18,type=service)
-agent_management == AgentManagementService(name=agent_management,id=Edwards-MacBook-Pro_local_12469.9,type=service)
-event_persister == EventPersister(name=event_persister,id=Edwards-MacBook-Pro_local_12469.1,type=standalone)
-identity_management == IdentityManagementService(name=identity_management,id=Edwards-MacBook-Pro_local_12469.5,type=service)
-catalog_management == CatalogManagementService(name=catalog_management,id=Edwards-MacBook-Pro_local_12469.22,type=service)
-pubsub_management == PubsubManagementService(name=pubsub_management,id=Edwards-MacBook-Pro_local_12469.15,type=service)
-"""
-
View
84 ion/services/sa/tcaa/terrestrial_endpoint.py
@@ -25,10 +25,9 @@
from interface.services.sa.iterrestrial_endpoint import BaseTerrestrialEndpoint
from interface.services.sa.iterrestrial_endpoint import TerrestrialEndpointProcessClient
-from ion.services.sa.tcaa.r3pc import R3PCServer
-from ion.services.sa.tcaa.r3pc import R3PCClient
+from ion.services.sa.tcaa.endpoint_mixin import EndpointMixin
-class TerrestrialEndpoint(BaseTerrestrialEndpoint):
+class TerrestrialEndpoint(BaseTerrestrialEndpoint, EndpointMixin):
"""
Terrestrial endpoint for two component agent architecture.
This class provides a manipulable terrestrial command queue and fully
@@ -39,7 +38,7 @@ def __init__(self, *args, **kwargs):
"""
For framework level code only.
"""
- super(BaseTerrestrialEndpoint, self).__init__(*args, **kwargs)
+ super(TerrestrialEndpoint, self).__init__(*args, **kwargs)
######################################################################
# Framework process lifecycle functions.
@@ -51,17 +50,8 @@ def on_init(self):
Setup default internal values.
"""
super(BaseTerrestrialEndpoint, self).on_init()
- self._server = None
- self._client = None
- self._remote_host = self.CFG.remote_host
- self._remote_port = self.CFG.remote_port
- self._terrestrial_port = self.CFG.terrestrial_port
- self._platform_resource_id = self.CFG.platform_resource_id
- self._link_status = TelemetryStatusType.UNAVAILABLE
+ self.mixin_on_init()
self._tx_dict = {}
- self._event_subscriber = None
- self._server_greenlet = None
- self._publisher = EventPublisher()
def on_start(self):
"""
@@ -71,30 +61,14 @@ def on_start(self):
Start telemetry subscriber.
"""
super(BaseTerrestrialEndpoint, self).on_start()
- self._server = R3PCServer(self._req_callback,
- self._server_close_callback)
- self._client = R3PCClient(self._ack_callback,
- self._client_close_callback)
- if self._terrestrial_port == 0:
- self._terrestrial_port = self._server.start('*', self._terrestrial_port)
- else:
- self._server.start('*', self._terrestrial_port)
- log.debug('Terrestrial server binding to *:%i', self._terrestrial_port)
-
- # Start the event subscriber.
- self._event_subscriber = EventSubscriber(
- event_type='PlatformTelemetryEvent',
- callback=self._consume_telemetry_event,
- origin=self._platform_resource_id)
- self._event_subscriber.start()
- self._event_subscriber._ready_event.wait(timeout=5)
+ self.mixin_on_start()
def on_stop(self):
"""
Process about to be stopped.
Stop sockets and subscriber.
"""
- self._stop()
+ self.mixin_on_stop()
super(BaseTerrestrialEndpoint, self).on_stop()
def on_quit(self):
@@ -102,7 +76,7 @@ def on_quit(self):
Process terminated following.
Stop sockets and subscriber.
"""
- self._stop()
+ self.mixin_on_quit()
super(BaseTerrestrialEndpoint, self).on_quit()
######################################################################
@@ -173,48 +147,6 @@ def _consume_telemetry_event(self, *args, **kwargs):
log.debug('Telemetry not available.')
self._on_link_down()
- ######################################################################
- # Helpers.
- ######################################################################
-
- def _on_link_up(self):
- """
- Processing on link up event.
- Start client socket.
- ION link availability published when pending commands are transmitted.
- """
- log.debug('Terrestrial client connecting to %s:%i',
- self._remote_host, self._remote_port)
- self._client.start(self._remote_host, self._remote_port)
- self._publisher.publish_event(
- event_type='PublicPlatformTelemetryEvent',
- origin=self._platform_resource_id,
- status=TelemetryStatusType.AVAILABLE)
-
- def _on_link_down(self):
- """
- Processing on link down event.
- Stop client socket and publish ION link unavailability.
- """
- self._client.stop()
- self._publisher.publish_event(
- event_type='PublicPlatformTelemetryEvent',
- origin=self._platform_resource_id,
- status=TelemetryStatusType.UNAVAILABLE)
-
- def _stop(self):
- """
- Stop sockets and subscriber.
- """
- if self._event_subscriber:
- self._event_subscriber.stop()
- self._event_subscriber = None
- if self._server:
- self._server.stop()
- self._server = None
- if self._client:
- self._client.stop()
- self._client = None
######################################################################
# Commands.
@@ -325,7 +257,7 @@ def get_port(self):
"""
Retrieve the terrestrial server port.
"""
- return self._terrestrial_port
+ return self._this_port
class TerrestrialEndpointClient(TerrestrialEndpointProcessClient):
View
73 ion/services/sa/tcaa/test/test_remote_endpoint.py
@@ -65,16 +65,22 @@ class TestRemoteEndpoint(IonIntegrationTestCase):
def setUp(self):
"""
"""
- self._terrestrial_host = None
- self._terrestrial_port = None
- self._remote_port = None
- self._platform_resource_id = 'a_remote_platform'
+
+ self._terrestrial_server = R3PCServer(self.consume_req, self.terrestrial_server_close)
+ self._terrestrial_client = R3PCClient(self.consume_ack, self.terrestrial_client_close)
+ self.addCleanup(self._terrestrial_server.stop)
+ self.addCleanup(self._terrestrial_client.stop)
+ self._other_port = self._terrestrial_server.start('*', 0)
+ log.debug('Terrestrial server binding to *:%i', self._other_port)
+
+ self._other_host = 'localhost'
+ self._platform_resource_id = 'abc123'
# Start container.
log.debug('Staring capability container.')
self._start_container()
- # Bring up services in a deploy file (no need to message)
+ # Bring up services in a deploy file (no need to message).
log.info('Staring deploy services.')
self.container.start_rel_from_url('res/deploy/r2deploy.yml')
@@ -85,30 +91,30 @@ def setUp(self):
# Create agent config.
endpoint_config = {
- 'terrestrial_host' : self._terrestrial_host,
- 'terrestrial_port' : self._terrestrial_port,
- 'remote_port' : 0,
+ 'other_host' : self._other_host,
+ 'other_port' : self._other_port,
+ 'this_port' : 0,
'platform_resource_id' : self._platform_resource_id
}
- # Spawn the terrestrial enpoint process.
+ # Spawn the remote enpoint process.
log.debug('Spawning remote endpoint process.')
- te_pid = container_client.spawn_process(
+ re_pid = container_client.spawn_process(
name='remote_endpoint_1',
module='ion.services.sa.tcaa.remote_endpoint',
cls='RemoteEndpoint',
config=endpoint_config)
- log.debug('Endpoint pid=%s.', str(te_pid))
+ log.debug('Endpoint pid=%s.', str(re_pid))
# Create an endpoint client.
self.re_client = RemoteEndpointClient(
process=FakeProcess(),
- to_name=te_pid)
- log.debug('Got te client %s.', str(self.re_client))
+ to_name=re_pid)
+ log.debug('Got re client %s.', str(self.re_client))
- # Remember the terrestrial port.
- self._remote_port = self.re_client.get_port()
- log.debug('The remote port is: %i.', self._remote_port)
+ # Remember the remote port.
+ self._this_port = self.re_client.get_port()
+ log.debug('The remote port is: %i.', self._this_port)
# Start the event publisher.
self._event_publisher = EventPublisher()
@@ -117,9 +123,9 @@ def on_link_up(self):
"""
Called by a test to simulate turning the link on.
"""
- log.debug('Remote client connecting to localhost:%i.',
- self._terrestrial_port)
- self._remote_client.start('localhost', self._terrestrial_port)
+ log.debug('Terrestrial client connecting to localhost:%i.',
+ self._this_port)
+ self._terrestrial_client.start('localhost', self._this_port)
# Publish a link up event to be caught by the endpoint.
log.debug('Publishing telemetry event.')
self._event_publisher.publish_event(
@@ -131,7 +137,7 @@ def on_link_down(self):
"""
Called by a test to simulate turning the link off.
"""
- self._remote_client.stop()
+ self._terrestrial_client.stop()
# Publish a link down event to be caught by the endpoint.
log.debug('Publishing telemetry event.')
self._event_publisher.publish_event(
@@ -139,8 +145,31 @@ def on_link_down(self):
origin=self._platform_resource_id,
status = TelemetryStatusType.UNAVAILABLE)
- def test_xxx(self):
+ def consume_req(self):
"""
"""
pass
-
+
+ def consume_ack(self):
+ """
+ """
+ pass
+
+ def terrestrial_server_close(self):
+ """
+ """
+ pass
+
+ def terrestrial_client_close(self):
+ """
+ """
+ pass
+
+ def test_xxx(self):
+ """
+ """
+
+ self.on_link_up()
+ gevent.sleep(2)
+ self.on_link_down()
+ gevent.sleep(2)
View
22 ion/services/sa/tcaa/test/test_terrestrial_endpoint.py
@@ -84,11 +84,11 @@ def setUp(self):
self._remote_client = R3PCClient(self.consume_ack, self.remote_client_close)
self.addCleanup(self._remote_server.stop)
self.addCleanup(self._remote_client.stop)
- self._remote_port = self._remote_server.start('*', 0)
- log.debug('Remote server binding to *:%i', self._remote_port)
+ self._other_port = self._remote_server.start('*', 0)
+ log.debug('Remote server binding to *:%i', self._other_port)
# Set internal variables.
- self._remote_host = 'localhost'
+ self._other_host = 'localhost'
self._platform_resource_id = 'abc123'
self._resource_id = 'fake_id'
self._no_requests = 10
@@ -120,9 +120,9 @@ def setUp(self):
# Create agent config.
endpoint_config = {
- 'remote_host' : self._remote_host,
- 'remote_port' : self._remote_port,
- 'terrestrial_port' : 0,
+ 'other_host' : self._other_host,
+ 'other_port' : self._other_port,
+ 'this_port' : 0,
'platform_resource_id' : self._platform_resource_id
}
@@ -142,7 +142,7 @@ def setUp(self):
log.debug('Got te client %s.', str(self.te_client))
# Remember the terrestrial port.
- self._terrestrial_port = self.te_client.get_port()
+ self._this_port = self.te_client.get_port()
# Start the event publisher.
self._event_publisher = EventPublisher()
@@ -197,8 +197,8 @@ def on_link_up(self):
Called by a test to simulate turning the link on.
"""
log.debug('Remote client connecting to localhost:%i.',
- self._terrestrial_port)
- self._remote_client.start('localhost', self._terrestrial_port)
+ self._this_port)
+ self._remote_client.start('localhost', self._this_port)
# Publish a link up event to be caught by the endpoint.
log.debug('Publishing telemetry event.')
self._event_publisher.publish_event(
@@ -362,8 +362,8 @@ def test_remote_late(self):
gevent.sleep(3)
- self._remote_client.start('localhost', self._terrestrial_port)
- self._remote_server.start('*', self._remote_port)
+ self._remote_client.start('localhost', self._this_port)
+ self._remote_server.start('*', self._other_port)
self._done_cmd_tx_evts.get(timeout=5)
self._done_evt.get(timeout=10)
Please sign in to comment.
Something went wrong with that request. Please try again.