Permalink
Browse files

refactoring of even listener support to enable external monitoring

  • Loading branch information...
ssadedin committed May 31, 2016
1 parent 6ee61ee commit 6feca2ed70296e3ef89fe46cfde22efdbd99d11d
View
@@ -32,3 +32,4 @@ local-lib/gridgain-hpc-5.1.2.jar
test.out
test.src.out
+/eclipse-build/
@@ -337,19 +337,20 @@ class CommandManager {
}
public static List<Command> getCurrentCommands() {
- File commandsDir = new File(DEFAULT_COMMAND_DIR)
- List<String> statuses = [CommandStatus.RUNNING, CommandStatus.QUEUEING, CommandStatus.WAITING]*.name()
- getCommands(commandsDir, statuses)
+ getCommandsByStatus([CommandStatus.RUNNING, CommandStatus.QUEUEING, CommandStatus.WAITING])
}
- public static List<Command> getCommands(File commandsDir, List<String> statuses = null) {
+ public static List<Command> getCommandsByStatus(List<CommandStatus> statusEnums) {
List<Command> result = []
if(!commandsDir.exists()) {
log.info "No commands directory exists"
}
- commandsDir.eachFileMatch(~/[0-9]+/) { File f ->
+ File executedDir = new File(DEFAULT_EXECUTED_DIR)
+
+ List<String> statuses = statusEnums == null ? null : statusEnums*.name()
+ [executedDir, commandsDir].grep {it.exists()}*.eachFileMatch(~/[0-9]+/) { File f ->
log.info "Loading command info from $f.absolutePath"
CommandExecutor cmdExec
Command cmd
@@ -372,7 +373,7 @@ class CommandManager {
}
// Update the status
- if((statuses != null) && (status in statuses)) {
+ if(statuses == null || status in statuses) {
cmd.status = CommandStatus.valueOf(status)
result.add(cmd)
}
@@ -796,57 +796,6 @@ class Dependencies {
return outputSize
}
- /**
- * Display a dump of the dependency graph for the given files
- *
- * @param args list of file names to display dependencies for
- */
- void queryOutputs(def args) {
- // Start by scanning the output folder for dependency files
- List<OutputMetaData> outputs = scanOutputFolder()
- GraphEntry graph = computeOutputGraph(outputs)
-
- if(args) {
- for(String arg in args) {
- GraphEntry filtered = graph.filter(arg)
- if(!filtered) {
- System.err.println "\nError: cannot locate output $arg in output dependency graph"
- continue
- }
- println "\n" + " $arg ".center(Config.config.columns, "=") + "\n"
- println "\n" + filtered.dump()
-
-
- OutputMetaData p = graph.propertiesFor(arg)
-
-
- String duration = "Unknown"
- String pendingDuration = "Unknown"
-
- if(p.stopTimeMs > 0) {
- duration = TimeCategory.minus(new Date(p.stopTimeMs),new Date(p.startTimeMs)).toString()
- pendingDuration = TimeCategory.minus(new Date(p.startTimeMs),new Date(p.createTimeMs)).toString()
- }
-
- println """
- Created: ${new Date(p.timestamp)}
- Pipeline Stage: ${p.stageName?:'Unknown'}
- Pending Time: ${pendingDuration}
- Running Time: ${duration}
- Inputs used: ${p.inputs.join(',')}
- Command: ${p.command}
- Preserved: ${p.preserve?'yes':'no'}
- Intermediate output: ${p.intermediate?'yes':'no'}
- """.stripIndent()
-
- println("\n" + ("=" * Config.config.columns))
- }
- }
- else {
- println "\nDependency graph is: \n\n" + graph.dump()
- }
- }
-
void preserve(def args) {
List<OutputMetaData> outputs = scanOutputFolder()
int count = 0
@@ -665,6 +665,12 @@ public class Pipeline implements ResourceRequestor {
String failureMessage = null
try {
this.checkRequiredInputs(Utils.box(inputFile))
+
+ if(!Runner.opts.t) {
+ new File(".bpipe/run.pid").text = Config.config.pid
+ File jobFile = new File(Runner.LOCAL_JOB_DIR, Config.config.pid)
+ jobFile.append("-----------------------\npguid: " + Config.config.pguid+"\n")
+ }
runSegment(inputFile, constructedPipeline)
if(failed) {
@@ -717,7 +723,7 @@ public class Pipeline implements ResourceRequestor {
println "\nWARNING: ${failedChecks.size()} check(s) failed. Use 'bpipe checks' to see details.\n"
}
- saveResultState(failed, allChecks, failedChecks)
+ log.info "Sending FINISHED event"
EventManager.instance.signal(PipelineEvent.FINISHED, "Pipeline " + (failed?"Failed":"Succeeded"),
[
@@ -729,6 +735,8 @@ public class Pipeline implements ResourceRequestor {
commands: CommandManager.executedCommands
])
+ saveResultState(failed, allChecks, failedChecks)
+
if(!failed) {
summarizeOutputs(stages)
}
@@ -825,9 +833,35 @@ public class Pipeline implements ResourceRequestor {
// branchPoint.appendNode(p.node)
++this.childCount
+
return p
}
+ private void loadToolVariables() {
+ if('install' in Config.userConfig) {
+ ConfigObject toolsCfg = Config.userConfig.install.tools
+ for(String toolName in toolsCfg.keySet()) {
+ String toolVariable = toolName.toUpperCase()
+ if(externalBinding.variables.containsKey(toolVariable)) {
+ log.info "Skip setting tool variable $toolVariable because already defined"
+ continue
+ }
+
+ Tool tool = ToolDatabase.instance.tools[toolName]
+ if(!tool) {
+ throw new PipelineError("A tool $toolName was referenced in the install section but is not a known tool in the tool database. Please define this tool in your 'tools' section.")
+ }
+
+ if(tool.config.containsKey("installExe") && tool.config.containsKey("installPath")) {
+ File scriptParentDir = new File(Config.config.script).absoluteFile.parentFile
+ File exeFile = new File(scriptParentDir, tool.config.installPath + "/" + tool.config.installExe)
+ log.info "Setting tool variable $toolVariable automatically to $exeFile.absolutePath based on install section of config"
+ externalBinding.variables.put(toolVariable,exeFile.absolutePath)
+ }
+ }
+ }
+ }
+
private void loadExternalStages() {
GroovyShell shell = new GroovyShell(externalBinding)
def pipeFolders = [new File(System.properties["user.home"], "bpipes")]
@@ -1770,6 +1770,7 @@ class PipelineContext {
command.branch = this.branch
command.outputs = checkOutputs.unique()
+ command.stageId = this.pipelineStages[-1].id
try {
commandManager.start(stageName, command, config, Utils.box(this.input),
new File(outputDirectory), this.usedResources,
@@ -8,6 +8,9 @@
* @author ssadedin
*/
public enum PipelineEvent {
+
+ /* ----------------- Pipeline Level Events ----------------------- */
+
/**
* Supported
*/
@@ -33,16 +36,28 @@
*/
STAGE_STARTED,
+ /* ----------------- Stage Level Events ----------------------- */
+
/**
- * Supported
+ * Stage has completed execution
+ * <p>
+ * Note this is sent regardless of success or failure. To listen for
+ * failure, see {@link #STAGE_FAILED}.
*/
STAGE_COMPLETED,
/**
* Future
*/
STAGE_FAILED,
+
+ /**
+ * Bpipe is exiting - Supported
+ */
+ SHUTDOWN,
+ /* ----------------- Command Level Events ----------------------- */
+
/**
* A command is being checked to see if it needs
* to be executed. eg. are inputs older than outputs?
@@ -64,10 +79,6 @@
*/
COMMAND_FAILED,
- /**
- * Bpipe is exiting - Supported
- */
- SHUTDOWN,
/**
* A report has been generated (supported)
Oops, something went wrong.

0 comments on commit 6feca2e

Please sign in to comment.