Skip to content

Commit

Permalink
Merge branch 'carlos_refactor_branch' into make_get_uptime_generic
Browse files Browse the repository at this point in the history
  • Loading branch information
swarbhanu committed Feb 26, 2013
2 parents 79e3469 + a6cfb0d commit c8006fb
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 105 deletions.
2 changes: 1 addition & 1 deletion extern/coverage-model
Submodule coverage-model updated 1 files
+2 −2 setup.py
2 changes: 1 addition & 1 deletion extern/ion-definitions
36 changes: 15 additions & 21 deletions ion/agents/cei/execution_engine_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@

from pyon.agent.simple_agent import SimpleResourceAgent
from pyon.core.exception import Unauthorized, NotFound
from pyon.core import bootstrap
from pyon.public import IonObject, log
from pyon.util.containers import get_safe
from pyon.agent.simple_agent import SimpleResourceAgentClient
from pyon.public import log
from pyon.net.endpoint import Publisher

from interface.objects import AgentCommand
Expand All @@ -19,7 +16,7 @@
from eeagent.eeagent_exceptions import EEAgentUnauthorizedException
from pidantic.pidantic_exceptions import PIDanticExecutionException
except ImportError:
EEAgentCore = None
EEAgentCore = None # noqa

"""
@package ion.agents.cei.execution_engine_agent
Expand Down Expand Up @@ -53,20 +50,19 @@ def on_init(self):
# TODO: Fail fast here?
log.error("No launch_type.name specified")

self._factory = get_exe_factory(launch_type_name, self.CFG,
pyon_container=self.container, log=log)
self._factory = get_exe_factory(
launch_type_name, self.CFG, pyon_container=self.container, log=log)

# TODO: Allow other core class?
self.core = EEAgentCore(self.CFG, self._factory, log)


interval = self.CFG.eeagent.get('heartbeat', DEFAULT_HEARTBEAT)
if interval > 0:
self.heartbeater = HeartBeater(self.CFG, self._factory, self.resource_id, self, log=log)
self.heartbeater = HeartBeater(
self.CFG, self._factory, self.resource_id, self, log=log)
self.heartbeater.poll()
self.heartbeat_thread = looping_call(0.1, self.heartbeater.poll)
else:
print "PDA: no heartbeat"
self.heartbeat_thread = None

def on_quit(self):
Expand Down Expand Up @@ -112,7 +108,8 @@ def __init__(self, CFG, factory, process_id, process, log=logging):
self._publisher = Publisher()
self._pd_name = CFG.eeagent.get('heartbeat_queue', 'heartbeat_queue')

self._factory.set_state_change_callback(self._state_change_callback, None)
self._factory.set_state_change_callback(
self._state_change_callback, None)
self._first_beat()

def _first_beat(self):
Expand All @@ -130,16 +127,10 @@ def _eea_started(self):
if self._started:
return True

try:
_eea_pyon_client = SimpleResourceAgentClient(self.process_id, process=self.process)
eea_client = ExecutionEngineAgentClient(_eea_pyon_client)
eea_client.dump_state()
if all(self.process._process.heartbeat()):
self._started = True
return True
except NotFound:
return False
except Exception:
self._log.exception("Couldn't get eeagent state. Perhaps it is broken?")
else:
return False

def poll(self):
Expand All @@ -155,7 +146,9 @@ def poll(self):
def beat(self):
try:
beat = make_beat_msg(self._factory, self._CFG)
message = dict(beat=beat, eeagent_id=self.process_id, resource_id=self._CFG.agent.resource_id)
message = dict(
beat=beat, eeagent_id=self.process_id,
resource_id=self._CFG.agent.resource_id)
to_name = self._pd_name

if self._log.isEnabledFor(logging.DEBUG):
Expand All @@ -164,7 +157,8 @@ def beat(self):
processes_str = "processes=%d" % len(processes)
else:
processes_str = ""
self._log.debug("Sending heartbeat to %s %s", self._pd_name, processes_str)
self._log.debug("Sending heartbeat to %s %s",
self._pd_name, processes_str)

self._publisher.publish(message, to_name=to_name)
except Exception:
Expand Down
Loading

0 comments on commit c8006fb

Please sign in to comment.