Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add managed listening endpoint API to BaseService

  • Loading branch information...
commit c0d5c5a274b13fcb6d923e6936779ae7eca378d6 1 parent 87dbbb1
@daf daf authored
Showing with 51 additions and 2 deletions.
  1. +20 −1 pyon/ion/process.py
  2. +31 −1 pyon/service/service.py
View
21 pyon/ion/process.py
@@ -160,7 +160,7 @@ def _child_failed(self, child):
def add_endpoint(self, listener):
"""
- Adds a listening endpoint to this ION process.
+ Adds a listening endpoint to be managed by this ION process.
Spawns the listen loop and sets the routing call to synchronize incoming messages
here. If this process hasn't been started yet, adds it to the list of listeners
@@ -183,6 +183,25 @@ def add_endpoint(self, listener):
else:
self._startup_listeners.append(listener)
+ def remove_endpoint(self, listener):
+ """
+ Removes a listening endpoint from management by this ION process.
+
+ If the endpoint is unknown to this ION process, raises an error.
+
+ @return The PyonThread running the listen loop, if it exists. You are
+ responsible for closing it when appropriate.
+ """
+
+ if listener in self.listeners:
+ self.listeners.remove(listener)
+ return self._listener_map.pop(listener)
+ elif listener in self._startup_listeners:
+ self._startup_listeners.remove(listener)
+ return None
+ else:
+ raise IonProcessError("Cannot remove unrecognized listener: %s" % listener)
+
def target(self, *args, **kwargs):
"""
Control entrypoint. Setup the base properties for this process (mainly a listener).
View
32 pyon/service/service.py
@@ -7,7 +7,7 @@
from zope.interface import implementedBy
-from pyon.core.exception import BadRequest
+from pyon.core.exception import BadRequest, ServerError
from pyon.util.log import log
from pyon.util.containers import named_any, itersubclasses
from pyon.util.context import LocalContextMixin
@@ -119,6 +119,36 @@ def __str__(self):
",type=", proc_type,
")"))
+ def add_endpoint(self, endpoint):
+ """
+ Adds a managed listening endpoint to this service/process.
+
+ The service/process must be running inside of an IonProcessThread, or this
+ method will raise an error.
+
+ A managed listening endpoint will report failures up to the process, then to
+ the container's process manager.
+ """
+ if self._process is None:
+ raise ServerError("No attached IonProcessThread")
+
+ self._process.add_endpoint(endpoint)
+
+ def remove_endpoint(self, endpoint):
+ """
+ Removes an endpoint from being managed by this service/process.
+
+ The service/process must be running inside of an IonProcessThread, or this
+ method will raise an error. It will also raise an error if the endpoint is
+ not currently managed.
+
+ Errors raised in the endpoint will no longer be reported to the process or
+ process manager.
+ """
+ if self._process is None:
+ raise ServerError("No attached IonProcessThread")
+
+ self._process.remove_endpoint(endpoint)
# -----------------------------------------------------------------------------------------------
# Service management infrastructure
Please sign in to comment.
Something went wrong with that request. Please try again.