Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #394 from labisso/581

Use eeagent direct queue for messaging from PD
  • Loading branch information...
commit a82caf91cb9b3a4530995d58af6390805034c2db 2 parents e245905 + 0c70819
David LaBissoniere labisso authored
9 ion/agents/cei/execution_engine_agent.py
View
@@ -60,7 +60,7 @@ def on_init(self):
interval = self.CFG.eeagent.get('heartbeat', DEFAULT_HEARTBEAT)
if interval > 0:
- self.heartbeater = HeartBeater(self.CFG, self._factory, log=log)
+ self.heartbeater = HeartBeater(self.CFG, self._factory, self.id, log=log)
self.heartbeater.poll()
self.heartbeat_thread = looping_call(0.1, self.heartbeater.poll)
else:
@@ -93,7 +93,7 @@ def rcmd_dump_state(self):
class HeartBeater(object):
- def __init__(self, CFG, factory, log=logging):
+ def __init__(self, CFG, factory, process_id, log=logging):
self._log = log
self._log.log(logging.DEBUG, "Starting the heartbeat thread")
@@ -103,6 +103,7 @@ def __init__(self, CFG, factory, log=logging):
self._res = None
self._done = False
self._factory = factory
+ self.process_id = process_id
self._publisher = Publisher()
self._pd_name = CFG.eeagent.get('heartbeat_queue', 'heartbeat_queue')
@@ -129,11 +130,11 @@ def poll(self):
def beat(self):
try:
beat = make_beat_msg(self._factory, self._CFG)
- message = dict(beat=beat, resource_id=self._CFG.agent.resource_id)
+ message = dict(beat=beat, eeagent_id=self.process_id, resource_id=self._CFG.agent.resource_id)
to_name = self._pd_name
self._log.debug("Send heartbeat: %s to %s", message, self._pd_name)
self._publisher.publish(message, to_name=to_name)
- except:
+ except Exception:
self._log.exception("beat failed")
46 ion/services/cei/process_dispatcher_service.py
View
@@ -125,7 +125,6 @@ def await(self, timeout=0):
self.process_id,
timeout)
-
return ret or self.last_chance
def _get_last_chance(self):
@@ -134,6 +133,7 @@ def _get_last_chance(self):
def _get_first_chance(self):
return self.first_chance
+
class ProcessDispatcherService(BaseProcessDispatcherService):
# Implementation notes:
@@ -389,6 +389,7 @@ def _get_process_name(self, process_definition, configuration):
return name
+
class PDDashiHandler(object):
"""Dashi messaging handlers for the Process Dispatcher"""
@@ -691,6 +692,7 @@ def list(self):
ProcessStateEnum.REJECTED: "900-REJECTED"
}
+
class Notifier(object):
"""Sends Process state notifications via ION events
@@ -750,38 +752,11 @@ class AnyEEAgentClient(object):
def __init__(self, process):
self.process = process
- # it's ok to cache these clients indefinitely. Longer term we may need
- # to prune this cache if we are dealing with many eeagents that come and
- # go.
- self.cache = {}
-
- def _get_client_for_eeagent(self, resource_id, attempts=60):
-
- cached = self.cache.get(resource_id)
- if cached:
- return cached
-
- exception = None
- for i in range(0, attempts):
- try:
- resource_client = SimpleResourceAgentClient(resource_id, process=self.process)
- client = ExecutionEngineAgentClient(resource_client)
- self.cache[resource_id] = client
- return client
- except (NotFound, ResourceNotFound, ServerError), e:
- # This exception catches a race condition, where:
- # 1. EEagent spawns and starts heartbeater
- # 2. heartbeat gets sent
- # 3. PD recieves heartbeat and tries to send a message but EEAgent,
- # hasn't been registered yet
- #
- # So, we try it a few times hoping that it'll come up
- log.exception("Couldn't get eeagent client")
- gevent.sleep(1)
- exception = e
- else:
- raise exception
+ def _get_client_for_eeagent(self, eeagent_id):
+ eeagent_id = str(eeagent_id)
+ resource_client = SimpleResourceAgentClient(eeagent_id, name=eeagent_id, process=self.process)
+ return ExecutionEngineAgentClient(resource_client)
def launch_process(self, eeagent, upid, round, run_type, parameters):
client = self._get_client_for_eeagent(eeagent)
@@ -908,14 +883,14 @@ def _heartbeat_callback(self, heartbeat, headers):
log.debug("Got EEAgent heartbeat. headers=%s msg=%s", headers, heartbeat)
try:
- resource_id = heartbeat['resource_id']
+ eeagent_id = heartbeat['eeagent_id']
beat = heartbeat['beat']
except KeyError, e:
log.warn("Invalid EEAgent heartbeat received. Missing: %s -- %s", e, heartbeat)
return
try:
- self.core.ee_heartbeat(resource_id, beat)
+ self.core.ee_heartbeat(eeagent_id, beat)
except (NotFound, ResourceNotFound, ServerError):
# This exception catches a race condition, where:
# 1. EEagent spawns and starts heartbeater
@@ -923,10 +898,9 @@ def _heartbeat_callback(self, heartbeat, headers):
# 3. PD recieves heartbeat and tries to send a message but EEAgent,
# hasn't been registered yet
log.exception("Problem processing heartbeat from eeagent")
- except:
+ except Exception:
log.exception("Unexpected error while processing heartbeat")
-
def create_definition(self, definition, definition_id=None):
"""
@type definition: ProcessDefinition
Please sign in to comment.
Something went wrong with that request. Please try again.