Skip to content
Browse files

add computed directory to jms agent reply

  • Loading branch information...
ssadedin committed Jun 29, 2019
1 parent 30fe684 commit ddc175cbe936fbe27075d49ef3b3c62f231eb701
@@ -106,5 +106,43 @@ is received, it will respond by sending a message to the queue specified in the
This message will have a JSON body containing information about the agent, and receiving it can be used to confirm
that the agent is alive and processing messages.

## Handling Pipeline Completion

It can be useful to coordinate downstream actions when the pipeline completes running. For this purpose, Bpipe will observe the `reply-to` or `JMSReplyTo`
property of messages. When a pipeline initiated by the agent completes, if
one of these properties is set, Bpipe will send a message to the corresponding
queue as a reply. In such a message, if a correlation id is set, then the message
will have the same correlation id.

This capability is designed to interoperate with frameworks such as
[Apache Camel]( which can route messages through
predefined workflows using this system. For example, a Camel route could be
defined using the Groovy DSL to run a pipeline in response to a message
and then process the results:

.transform { e, c ->
"command" : "run",
"arguments": [
"pipeline/batch.groovy", // the file to analyse
] +
"directory": "/some/path/on/your/system"
.to('activemq:run_bpipe?requestTimeout=720000') // 2 hour timeout
.process { e ->
println "The results from the pipeline were: $"

Note that the `inOut` automatically handles the correlation id and reply-to headers and waits for the reply. The bpipe agent, in this case, would be configured to
listen on the `run_bpipe` queue.

@@ -1,6 +1,8 @@
package bpipe.agent

import bpipe.PipelineError
import bpipe.cmd.BpipeCommand
import bpipe.cmd.RunPipelineCommand
import bpipe.worx.JMSWorxConnection
import bpipe.worx.WorxConnection
import groovy.json.JsonOutput
@@ -142,12 +144,17 @@ class JMSAgent extends Agent { "ReplyTo set on message: will send message when complete"
runner.completionListener = { result -> "Sending reply for command $"
sendReply(message, JsonOutput.prettyPrint(JsonOutput.toJson(

BpipeCommand command = runner.command
Map resultDetails = [
command: commandAttributes,
result: result

if(command instanceof RunPipelineCommand) { = command.runDirectory?.canonicalPath
sendReply(message, JsonOutput.prettyPrint(JsonOutput.toJson(resultDetails )))

@@ -36,6 +36,8 @@ class RunPipelineCommand extends BpipeCommand {

ExecutedProcess result

File runDirectory

public RunPipelineCommand(List<String> args) {
super("run", args);
@@ -55,13 +57,16 @@ class RunPipelineCommand extends BpipeCommand {
throw new IllegalArgumentException("Unable to create directory requested for pipeline run $dir.") "Running with arguments: " + args; "Running with arguments: " + args + " in directory " + dirFile;

this.runDirectory = dirFile

List<String> cmd = [ bpipe.Runner.BPIPE_HOME + "/bin/bpipe", "run" ]
result = Utils.executeCommand(cmd, out:out, err: out) {



0 comments on commit ddc175c

Please sign in to comment.
You can’t perform that action at this time.