Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix spawning pyon processes in a greenlet

  • Loading branch information...
commit 7e3d006fd9042c8d69f5e9c107b9b1554faadbba 1 parent bb4ba9f
@oldpatricka oldpatricka authored
View
69 pidantic/pyon/pidpyon.py
@@ -1,12 +1,19 @@
import logging
-from pidantic.pyon.pyon import Pyon
+from pidantic.pyon.pyon import Pyon, SPAWN_REQUEST
from pidantic.ui import PidanticFactory
from pidantic.pidbase import PIDanticStateMachineBase
from pidantic.pidantic_exceptions import PIDanticUsageException # , PIDanticExecutionException
from pidantic.pyon.persistence import PyonDB
+try:
+ from interface.objects import ProcessStateEnum
+except ImportError:
+ # Note: this will never happen in a case where this module is
+ # actually used. Only in tests.
+ ProcessStateEnum = object
+
class PyonPidanticFactory(PidanticFactory):
@@ -105,11 +112,13 @@ def _mark_all_failed(self):
def poll(self):
all_procs = self._pyon.get_all_procs()
-
for name, pidpyon in self._watched_processes.iteritems():
- state = all_procs.get(pidpyon._program_object.pyon_process_id)
- pidpyon._process_state_change(state)
+ if pidpyon._program_object.pyon_process_id == SPAWN_REQUEST:
+ pidpyon._process_state_change(SPAWN_REQUEST)
+ else:
+ pyon_process = all_procs.get(pidpyon._program_object.pyon_process_id)
+ pidpyon._process_state_change(pyon_process)
def terminate(self):
self._pyon.terminate()
@@ -127,6 +136,11 @@ def __init__(self, program_object, pyon, log=logging, use_channel=False,
PIDanticStateMachineBase.__init__(self, log=log,
use_channel=use_channel, channel_is_stdio=channel_is_stdio)
+ # TODO: PDA: am I adding a ton of callbacks here?
+ self._pyon._container.proc_manager.add_proc_state_changed_callback(self._pyon_process_state_change_callback)
+ print "PDA: %s" % self._pyon._container.proc_manager._proc_state_change_callbacks
+
+
def get_error_message(self):
return str(self._exception)
@@ -219,10 +233,27 @@ def has_stdout(self):
def has_stderr(self):
pass
+ def _pyon_process_state_change_callback(self, process, state, container):
+
+ if self._program_object.pyon_process_id != process.id:
+ return
+
+ event = None
+ if state == ProcessStateEnum.ERROR:
+ event = "EVENT_EXITED"
+ self._exit_code = 100
+ elif state == ProcessStateEnum.TERMINATE:
+ event = "EVENT_EXITED"
+ self._exit_code = 0
+ else:
+ self._log.log(logging.WARNING, "%s (%s) has an unknown state. Process isn't running?" % (self._program_object.process_name, self._program_object.command))
+
+ if event:
+ self._send_event(event)
+
def _process_state_change(self, pyon_proc):
event = None
- #self._log.log(logging.INFO, "%s (%s) received pyon event %s" % (self._program_object.process_name, self._program_object.command, state_name))
if not pyon_proc and self._exit_code:
event = "EVENT_EXITED"
# Keep existing non-zero exit code
@@ -230,40 +261,20 @@ def _process_state_change(self, pyon_proc):
# Process is missing, so exited
event = "EVENT_EXITED"
self._exit_code = 0
+ elif pyon_proc and pyon_proc == SPAWN_REQUEST:
+ event = "EVENT_START_REQUEST"
elif pyon_proc and pyon_proc.running:
event = "EVENT_RUNNING"
else:
self._log.log(logging.WARNING, "%s (%s) has an unknown state. Process isn't running?" % (self._program_object.process_name, self._program_object.command))
- #if state_name == "STOPPED":
- #event = "EVENT_STOPPED"
- #elif state_name == "STARTING":
- ## this is a restart or the first start.ignore
- #pass
- #elif state_name == "RUNNING":
- #event = "EVENT_RUNNING"
- #elif state_name == "STOPPING":
- # ignore this one and wait for stop event
- #pass
- #elif state_name == "EXITED":
- #self._exit_code = exit_status
- #event = "EVENT_EXITED"
- #elif state_name == "FATAL":
- #self._exit_code = 200
- #self._exception = "Fatal from supd"
- #event = "EVENT_EXITED"
- #elif state_name == "UNKNOWN":
- #event = "EVENT_FAULT"
- #elif state_name == "BACKOFF":
- #event = "EVENT_EXITED"
- #self._exit_code = 100
-
if event:
self._send_event(event)
def poll(self):
- self._supd.poll()
+ pass
def cleanup(self):
self._pyon.remove_process(self._program_object.process_name)
+ self._pyon._container.proc_manager.remove_proc_state_changed_callback(self._pyon_process_state_change_callback)
View
16 pidantic/pyon/pyon.py
@@ -12,6 +12,7 @@
from pidantic.pyon.persistence import PyonDataObject, PyonProcDataObject
from pidantic.pidantic_exceptions import PIDanticUsageException, PIDanticExecutionException
+SPAWN_REQUEST = "spawn_request"
def proc_manager_lock(func):
@@ -93,8 +94,17 @@ def create_process_db(self, **kwargs):
@proc_manager_lock
def run_process(self, process_object, async=True):
+ if process_object.module_uri is not None:
+ module_file = self.download_module(process_object.module_uri)
+ module = self.load_module(module_file, process_object.module)
+ process_object.module = module
+ self._pyon_db.db_commit()
+
if async:
spawn(self._run_process, process_object)
+ process_object.pyon_process_id = SPAWN_REQUEST
+ self._pyon_db.db_commit()
+ return SPAWN_REQUEST
else:
return self._run_process(process_object)
@@ -104,12 +114,6 @@ def _run_process(self, process_object):
except AttributeError:
config = None
- if process_object.module_uri is not None:
- module_file = self.download_module(process_object.module_uri)
- module = self.load_module(module_file, process_object.module)
- process_object.module = module
- self._pyon_db.db_commit()
-
try:
pyon_id = self._container.spawn_process(name=process_object.pyon_name,
module=process_object.module, cls=process_object.cls,
View
1  pidantic/state_machine.py
@@ -63,6 +63,7 @@ def __init__(self, o, log=logging):
# the next mapping just meanst that the process has not yet been started
self.set_mapping(PIDanticState.STATE_STARTING, PIDanticEvents.EVENT_STOPPED, PIDanticState.STATE_STARTING, None)
self.set_mapping(PIDanticState.STATE_STARTING, PIDanticEvents.EVENT_RESTART_REQUEST, PIDanticState.STATE_STOPPING_RESTART, o.sm_restarting)
+ self.set_mapping(PIDanticState.STATE_STARTING, PIDanticEvents.EVENT_START_REQUEST, PIDanticState.STATE_STARTING, None)
self.set_mapping(PIDanticState.STATE_RUNNING, PIDanticEvents.EVENT_STOPPED, PIDanticState.STATE_TERMINATED, o.sm_stopped)
self.set_mapping(PIDanticState.STATE_RUNNING, PIDanticEvents.EVENT_EXITED, PIDanticState.STATE_EXITED, o.sm_stopped)
Please sign in to comment.
Something went wrong with that request. Please try again.