Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tickets/dm 8824 #4

Merged
merged 2 commits into from
Jan 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
182 changes: 0 additions & 182 deletions bin.src/Logger.py

This file was deleted.

6 changes: 2 additions & 4 deletions python/lsst/ctrl/orca/CondorWorkflowLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ def cleanUp(self):
"""
log.debug("CondorWorkflowLauncher:cleanUp")

def launch(self, statusListener, loggerManagers):
def launch(self, statusListener):
"""Launch this workflow

Parameters
----------
statusListener : StatusListener
status listener object
loggerManagers : list
list of all logger managers for this workflow
"""
log.debug("CondorWorkflowLauncher:launch")

Expand All @@ -90,7 +88,7 @@ def launch(self, statusListener, loggerManagers):

# workflow monitor for HTCondor jobs
self.workflowMonitor = CondorWorkflowMonitor(eventBrokerHost, shutdownTopic, self.runid,
condorDagId, loggerManagers, self.monitorConfig)
condorDagId, self.monitorConfig)

if statusListener is not None:
self.workflowMonitor.addStatusListener(statusListener)
Expand Down
77 changes: 12 additions & 65 deletions python/lsst/ctrl/orca/CondorWorkflowMonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,10 @@ class CondorWorkflowMonitor(WorkflowMonitor):
run id for this workflow
condorDagId : `str`
job id of submitted HTCondor dag
loggerManagers: [ logMgr1, logMgr2 ]
list of logger process managers
monitorConfig : Config
configuration file for monitor information
"""
def __init__(self, eventBrokerHost, shutdownTopic, runid, condorDagId, loggerManagers, monitorConfig):
def __init__(self, eventBrokerHost, shutdownTopic, runid, condorDagId, monitorConfig):

# _locked: a container for data to be shared across threads that
# have access to this object.
Expand All @@ -64,13 +62,6 @@ def __init__(self, eventBrokerHost, shutdownTopic, runid, condorDagId, loggerMan

# make a copy of this liste, since we'll be removing things.

# list of logger process ids
self.loggerPIDs = []
for lm in loggerManagers:
self.loggerPIDs.append(lm.getPID())

self.loggerManagers = loggerManagers

self._eventBrokerHost = eventBrokerHost
self._shutdownTopic = shutdownTopic

Expand All @@ -91,9 +82,6 @@ def __init__(self, eventBrokerHost, shutdownTopic, runid, condorDagId, loggerMan
# create event identifier for this process
self.originatorId = self.eventSystem.createOriginatorId()

# flag to indicate that last logger event has been sent
self.bSentLastLoggerEvent = False

with self._locked:
self._wfMonitorThread = CondorWorkflowMonitor._WorkflowMonitorThread(self,
self._eventBrokerHost,
Expand All @@ -118,8 +106,6 @@ class _WorkflowMonitorThread(threading.Thread):
run id for this workflow
condorDagId : `str`
job id of submitted HTCondor dag
loggerManagers: [ logMgr1, logMgr2 ]
list of logger process managers
monitorConfig : `Config`
configuration file for monitor information
"""
Expand All @@ -134,7 +120,6 @@ def __init__(self, parent, eventBrokerHost, shutdownTopic,

selector = "RUNID = '%s'" % runid
self._receiver = events.EventReceiver(self._eventBrokerHost, self._eventTopic, selector)
self._Logreceiver = events.EventReceiver(self._eventBrokerHost, "LoggerStatus", selector)

# the dag id assigned to this workflow
self.condorDagId = condorDagId
Expand All @@ -157,30 +142,23 @@ def run(self):
time.sleep(sleepInterval)
event = self._receiver.receiveEvent(1)

logEvent = self._Logreceiver.receiveEvent(1)

if event is not None:
# val = self._parent.handleEvent(event)
self._parent.handleEvent(event)
if not self._parent._locked.running:
print("and...done!")
return
elif logEvent is not None:
self._parent.handleEvent(logEvent)
# val = self._parent.handleEvent(logEvent)

if not self._parent._locked.running:
print("logger handled... and... done!")
return

if (event is not None) or (logEvent is not None):
if (event is not None):
sleepInterval = 0
else:
sleepInterval = statusCheckInterval
# if the dag is no longer running, send the logger an event
# telling it to clean up.
# if the dag is no longer running, return
if not cj.isJobAlive(self.condorDagId):
self._parent.sendLastLoggerEvent()
# self._parent.sendLastLoggerEvent()
print("work complete.")
with self._parent._locked:
self._parent._locked.running = False

def startMonitorThread(self, runid):
"""Begin one monitor thread
Expand Down Expand Up @@ -212,52 +190,21 @@ def handleEvent(self, event):

ps = event.getPropertySet()

# check for Logger event status
if event.getType() == events.EventTypes.STATUS:
ps = event.getPropertySet()

if ps.exists("logger.status"):
pid = ps.getInt("logger.pid")
log.debug("logger.pid = %s", str(pid))
if pid in self.loggerPIDs:
self.loggerPIDs.remove(pid)

# if the logger list is empty, we're finished.
if len(self.loggerPIDs) == 0:
with self._locked:
self._locked.running = False
elif event.getType() == events.EventTypes.COMMAND:
# TODO: stop this thing right now.
# that means the logger and the dag.
# check for event type
if event.getType() == events.EventTypes.COMMAND:
# stop this thing right now.
# that means the dag process
with self._locked:
self._locked.running = False
else:
print("didn't handle anything")

def sendLastLoggerEvent(self):
"""Send a message to the logger that we're done
"""
# only do this one time
if not self.bSentLastLoggerEvent:
print("sending last Logger Event")
transmitter = events.EventTransmitter(self._eventBrokerHost, events.LogEvent.LOGGING_TOPIC)

props = PropertySet()
props.set("LOGGER", "orca.control")
props.set("STATUS", "eol")

e = events.Event(self.runid, props)
transmitter.publishEvent(e)

self.bSentLastLoggerEvent = True

def stopWorkflow(self, urgency):
"""Stop the workflow
"""
log.debug("CondorWorkflowMonitor:stopWorkflow")

# do a condor_rm on the cluster id for the dag we submitted.
print("shutdown request received: stopping workflow")
cj = CondorJobs()
cj.killCondorId(self.condorDagId)

self.sendLastLoggerEvent()