Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'spawn'

  • Loading branch information...
commit 6d42d151f4c325ad31ce3a3f93b04e3faf6696c4 2 parents 6c24334 + ea89bb6
@oldpatricka oldpatricka authored
Showing with 72 additions and 49 deletions.
  1. +54 −40 pidantic/pyon/pidpyon.py
  2. +18 −9 pidantic/pyon/pyon.py
View
94 pidantic/pyon/pidpyon.py
@@ -1,12 +1,16 @@
import logging
-from pidantic.pyon.pyon import Pyon
+from pidantic.pyon.pyon import Pyon, FAILED_PROCESS
from pidantic.ui import PidanticFactory
from pidantic.pidbase import PIDanticStateMachineBase
from pidantic.pidantic_exceptions import PIDanticUsageException # , PIDanticExecutionException
from pidantic.pyon.persistence import PyonDB
+try:
+ from interface.objects import ProcessStateEnum
+except ImportError:
+ ProcessStateEnum = object()
class PyonPidanticFactory(PidanticFactory):
@@ -103,13 +107,10 @@ def _mark_all_failed(self):
pidpyon._process_state_change(state)
def poll(self):
-
all_procs = self._pyon.get_all_procs()
-
for name, pidpyon in self._watched_processes.iteritems():
-
- state = all_procs.get(pidpyon._program_object.pyon_process_id)
- pidpyon._process_state_change(state)
+ pyon_process = all_procs.get(pidpyon._program_object.pyon_process_id)
+ pidpyon._process_state_change(pyon_process)
def terminate(self):
self._pyon.terminate()
@@ -120,6 +121,8 @@ class PIDanticPyon(PIDanticStateMachineBase):
def __init__(self, program_object, pyon, log=logging, use_channel=False,
channel_is_stdio=False):
self._program_object = program_object
+ self._pyon_id = None
+ self._callback_state = None
self._pyon = pyon
self._exit_code = None
self._exception = None
@@ -127,6 +130,11 @@ def __init__(self, program_object, pyon, log=logging, use_channel=False,
PIDanticStateMachineBase.__init__(self, log=log,
use_channel=use_channel, channel_is_stdio=channel_is_stdio)
+ self._pyon._container.proc_manager.add_proc_state_changed_callback(self._pyon_process_state_change_callback)
+
+ def set_pyon_id(self, pyon_id):
+ self._pyon_id = pyon_id
+
def get_error_message(self):
return str(self._exception)
@@ -145,7 +153,7 @@ def sm_state_changed(self, new_state):
def sm_starting(self):
self._log.info("%s Starting" % (self._program_object.process_name))
- self._pyon.run_process(self._program_object)
+ self._pyon.run_process(self._program_object, pyon_id_callback=self.set_pyon_id)
def sm_request_canceled(self):
self._log.info("%s request canceled" % (
@@ -219,44 +227,50 @@ def has_stdout(self):
def has_stderr(self):
pass
+ def _pyon_process_state_change_callback(self, process, state, container):
+ """This callback is used by pyon for notifying us about state change
+ events in the process we've started. The value of the state is saved in
+ memory, and is picked up the next time the _process_state_change is called,
+ which is triggered by an eeagent heartbeat.
+ """
+
+ if not hasattr(process, 'id'):
+ # Process is in Pending state, which we ignore, because we don't
+ # Have a process id for it
+ return
+
+ if self._pyon_id != process.id:
+ return
+
+ if state == ProcessStateEnum.FAILED:
+ self._callback_state = 'EVENT_EXITED'
+ self._exit_code = 100
+ elif state == ProcessStateEnum.TERMINATED:
+ self._callback_state = 'EVENT_EXITED'
+ self._exit_code = 0
+ elif state == ProcessStateEnum.EXITED:
+ self._callback_state = 'EVENT_EXITED'
+ self._exit_code = 0
+ else:
+ self._log.log(logging.WARNING, "%s has an unknown state %s. Process isn't running?" % (self._pyon_id, ProcessStateEnum._str_map[state]))
+
def _process_state_change(self, pyon_proc):
event = None
- #self._log.log(logging.INFO, "%s (%s) received pyon event %s" % (self._program_object.process_name, self._program_object.command, state_name))
- if not pyon_proc and self._exit_code:
- event = "EVENT_EXITED"
- # Keep existing non-zero exit code
- elif not pyon_proc:
- # Process is missing, so exited
- event = "EVENT_EXITED"
- self._exit_code = 0
- elif pyon_proc and pyon_proc.running:
+ if self._pyon_id == FAILED_PROCESS:
+ event = 'EVENT_EXITED'
+ self._exit_code = 100
+ elif not pyon_proc and self._exit_code:
+ event = 'EVENT_EXITED'
+ elif self._pyon_id and not self._program_object.pyon_process_id:
+ self._program_object.pyon_process_id = self._pyon_id
+ self._pyon._pyon_db.db_commit()
event = "EVENT_RUNNING"
+ elif self._callback_state is not None:
+ event = self._callback_state
+ self._callback_state = None
else:
- self._log.log(logging.WARNING, "%s (%s) has an unknown state. Process isn't running?" % (self._program_object.process_name, self._program_object.command))
-
- #if state_name == "STOPPED":
- #event = "EVENT_STOPPED"
- #elif state_name == "STARTING":
- ## this is a restart or the first start.ignore
- #pass
- #elif state_name == "RUNNING":
- #event = "EVENT_RUNNING"
- #elif state_name == "STOPPING":
- # ignore this one and wait for stop event
- #pass
- #elif state_name == "EXITED":
- #self._exit_code = exit_status
- #event = "EVENT_EXITED"
- #elif state_name == "FATAL":
- #self._exit_code = 200
- #self._exception = "Fatal from supd"
- #event = "EVENT_EXITED"
- #elif state_name == "UNKNOWN":
- #event = "EVENT_FAULT"
- #elif state_name == "BACKOFF":
- #event = "EVENT_EXITED"
- #self._exit_code = 100
+ self._log.log(logging.WARNING, "%s has an unknown state. %s?" % (self._program_object.process_name, pyon_proc))
if event:
self._send_event(event)
View
27 pidantic/pyon/pyon.py
@@ -6,11 +6,13 @@
from imp import load_source
from urllib import urlretrieve
from uuid import uuid4
+from gevent import spawn
from eeagent.util import unmake_id
from pidantic.pyon.persistence import PyonDataObject, PyonProcDataObject
from pidantic.pidantic_exceptions import PIDanticUsageException, PIDanticExecutionException
+FAILED_PROCESS = "failed process"
def proc_manager_lock(func):
def call(self, *args, **kwargs):
@@ -89,12 +91,7 @@ def create_process_db(self, **kwargs):
return process_object
@proc_manager_lock
- def run_process(self, process_object):
-
- try:
- config = yaml.load(process_object.config)
- except AttributeError:
- config = None
+ def run_process(self, process_object, pyon_id_callback=None, async=True):
if process_object.module_uri is not None:
module_file = self.download_module(process_object.module_uri)
@@ -102,14 +99,26 @@ def run_process(self, process_object):
process_object.module = module
self._pyon_db.db_commit()
+ if async:
+ spawn(self._run_process, process_object, pyon_id_callback=pyon_id_callback)
+ else:
+ return self._run_process(process_object)
+
+ def _run_process(self, process_object, pyon_id_callback=None):
+ try:
+ config = yaml.load(process_object.config)
+ except AttributeError:
+ config = None
+
try:
pyon_id = self._container.spawn_process(name=process_object.pyon_name,
module=process_object.module, cls=process_object.cls,
config=config)
- process_object.pyon_process_id = pyon_id
- self._pyon_db.db_commit()
- return pyon_id
+ if pyon_id_callback is not None:
+ pyon_id_callback(pyon_id)
except:
+ if pyon_id_callback is not None:
+ pyon_id_callback(FAILED_PROCESS)
self._log.exception("Problem starting pyon process %s" % process_object.pyon_name)
return None
Please sign in to comment.
Something went wrong with that request. Please try again.