Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Use eeagent direct queue for messaging from PD #394

Merged
merged 1 commit into from

1 participant

@labisso
Collaborator

Part of OOIION-581.

This keeps a consistent queue name for eeagents across container restarts

@labisso labisso merged commit a82caf9 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 7, 2012
  1. @labisso

    Use eeagent direct queue for messaging from PD

    labisso authored
    Part of OOIION-581
This page is out of date. Refresh to see the latest.
View
9 ion/agents/cei/execution_engine_agent.py
@@ -60,7 +60,7 @@ def on_init(self):
interval = self.CFG.eeagent.get('heartbeat', DEFAULT_HEARTBEAT)
if interval > 0:
- self.heartbeater = HeartBeater(self.CFG, self._factory, log=log)
+ self.heartbeater = HeartBeater(self.CFG, self._factory, self.id, log=log)
self.heartbeater.poll()
self.heartbeat_thread = looping_call(0.1, self.heartbeater.poll)
else:
@@ -93,7 +93,7 @@ def rcmd_dump_state(self):
class HeartBeater(object):
- def __init__(self, CFG, factory, log=logging):
+ def __init__(self, CFG, factory, process_id, log=logging):
self._log = log
self._log.log(logging.DEBUG, "Starting the heartbeat thread")
@@ -103,6 +103,7 @@ def __init__(self, CFG, factory, log=logging):
self._res = None
self._done = False
self._factory = factory
+ self.process_id = process_id
self._publisher = Publisher()
self._pd_name = CFG.eeagent.get('heartbeat_queue', 'heartbeat_queue')
@@ -129,11 +130,11 @@ def poll(self):
def beat(self):
try:
beat = make_beat_msg(self._factory, self._CFG)
- message = dict(beat=beat, resource_id=self._CFG.agent.resource_id)
+ message = dict(beat=beat, eeagent_id=self.process_id, resource_id=self._CFG.agent.resource_id)
to_name = self._pd_name
self._log.debug("Send heartbeat: %s to %s", message, self._pd_name)
self._publisher.publish(message, to_name=to_name)
- except:
+ except Exception:
self._log.exception("beat failed")
View
46 ion/services/cei/process_dispatcher_service.py
@@ -125,7 +125,6 @@ def await(self, timeout=0):
self.process_id,
timeout)
-
return ret or self.last_chance
def _get_last_chance(self):
@@ -134,6 +133,7 @@ def _get_last_chance(self):
def _get_first_chance(self):
return self.first_chance
+
class ProcessDispatcherService(BaseProcessDispatcherService):
# Implementation notes:
@@ -389,6 +389,7 @@ def _get_process_name(self, process_definition, configuration):
return name
+
class PDDashiHandler(object):
"""Dashi messaging handlers for the Process Dispatcher"""
@@ -691,6 +692,7 @@ def list(self):
ProcessStateEnum.REJECTED: "900-REJECTED"
}
+
class Notifier(object):
"""Sends Process state notifications via ION events
@@ -750,38 +752,11 @@ class AnyEEAgentClient(object):
def __init__(self, process):
self.process = process
- # it's ok to cache these clients indefinitely. Longer term we may need
- # to prune this cache if we are dealing with many eeagents that come and
- # go.
- self.cache = {}
-
- def _get_client_for_eeagent(self, resource_id, attempts=60):
-
- cached = self.cache.get(resource_id)
- if cached:
- return cached
-
- exception = None
- for i in range(0, attempts):
- try:
- resource_client = SimpleResourceAgentClient(resource_id, process=self.process)
- client = ExecutionEngineAgentClient(resource_client)
- self.cache[resource_id] = client
- return client
- except (NotFound, ResourceNotFound, ServerError), e:
- # This exception catches a race condition, where:
- # 1. EEagent spawns and starts heartbeater
- # 2. heartbeat gets sent
- # 3. PD recieves heartbeat and tries to send a message but EEAgent,
- # hasn't been registered yet
- #
- # So, we try it a few times hoping that it'll come up
- log.exception("Couldn't get eeagent client")
- gevent.sleep(1)
- exception = e
- else:
- raise exception
+ def _get_client_for_eeagent(self, eeagent_id):
+ eeagent_id = str(eeagent_id)
+ resource_client = SimpleResourceAgentClient(eeagent_id, name=eeagent_id, process=self.process)
+ return ExecutionEngineAgentClient(resource_client)
def launch_process(self, eeagent, upid, round, run_type, parameters):
client = self._get_client_for_eeagent(eeagent)
@@ -908,14 +883,14 @@ def _heartbeat_callback(self, heartbeat, headers):
log.debug("Got EEAgent heartbeat. headers=%s msg=%s", headers, heartbeat)
try:
- resource_id = heartbeat['resource_id']
+ eeagent_id = heartbeat['eeagent_id']
beat = heartbeat['beat']
except KeyError, e:
log.warn("Invalid EEAgent heartbeat received. Missing: %s -- %s", e, heartbeat)
return
try:
- self.core.ee_heartbeat(resource_id, beat)
+ self.core.ee_heartbeat(eeagent_id, beat)
except (NotFound, ResourceNotFound, ServerError):
# This exception catches a race condition, where:
# 1. EEagent spawns and starts heartbeater
@@ -923,10 +898,9 @@ def _heartbeat_callback(self, heartbeat, headers):
# 3. PD recieves heartbeat and tries to send a message but EEAgent,
# hasn't been registered yet
log.exception("Problem processing heartbeat from eeagent")
- except:
+ except Exception:
log.exception("Unexpected error while processing heartbeat")
-
def create_definition(self, definition, definition_id=None):
"""
@type definition: ProcessDefinition
Something went wrong with that request. Please try again.