Skip to content

Commit

Permalink
Fix Invalid response cause Google pipelines exec to crash #1163
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Jun 3, 2019
1 parent 623abb9 commit 7e46008
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,33 @@ class GooglePipelinesHelper {

Operation checkOperationStatus(Operation operation) {
init()
genomicsClient.projects().operations().get(operation.getName()).execute()
try {
genomicsClient.projects().operations().get(operation.getName()).execute()
}
catch( IOException e ) {
log.warn("Invalid server response fetching operation status: $operation", e)
return null
}
}

void cancelOperation(Operation operation) {
init()
genomicsClient.projects().operations().cancel(operation.getName(), new CancelOperationRequest()).execute()
try {
genomicsClient.projects().operations().cancel(operation.getName(), new CancelOperationRequest()).execute()
}
catch( IOException e ) {
log.warn("Invalid server response cancelling operation: $operation", e)
}
}

Operation runPipeline(Pipeline pipeline, Map<String,String> labels = [:]) {
init()
genomicsClient.pipelines().run(new RunPipelineRequest().setPipeline(pipeline).setLabels(labels)).execute()
try {
genomicsClient.pipelines().run(new RunPipelineRequest().setPipeline(pipeline).setLabels(labels)).execute()
}
catch( IOException e ) {
log.warn("Invalid server response running pipeline: $pipeline", e)
return null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import com.google.api.services.genomics.v2alpha1.model.Operation
import groovy.json.JsonOutput
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.exception.ProcessSubmitException
import nextflow.exception.ProcessUnrecoverableException
import nextflow.processor.TaskBean
import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.trace.TraceRecord
import nextflow.util.Escape

/**
* Task handler for Google Pipelines.
*
Expand Down Expand Up @@ -150,7 +150,9 @@ class GooglePipelinesTaskHandler extends TaskHandler {
// note, according to the semantic of this method
// the handler status has to be changed to RUNNING either
// if the operation is still running or it has completed
def result = executor.helper.checkOperationStatus(operation)
final result = executor.helper.checkOperationStatus(operation)
if( !result )
return false
logEvents(result)

if( result!=null ) {
Expand All @@ -165,8 +167,12 @@ class GooglePipelinesTaskHandler extends TaskHandler {
boolean checkIfCompleted() {
if( !isRunning() )
return false

operation = executor.helper.checkOperationStatus(operation)

final resultOp = executor.helper.checkOperationStatus(operation)
if( !resultOp )
return false

operation = resultOp
logEvents(operation)

if (operation.getDone()) {
Expand Down Expand Up @@ -235,6 +241,8 @@ class GooglePipelinesTaskHandler extends TaskHandler {
log.trace "[GPAPI] Task created > $task.name - Request: $req"

operation = submitPipeline(req)
if( !operation )
throw new ProcessSubmitException("Failed to submit task with name: $task.name")
pipelineId = getPipelineIdFromOp(operation)
status = TaskStatus.SUBMITTED

Expand Down

1 comment on commit 7e46008

@pditommaso
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've uploaded a tentative for a patch. You give it a try using the latest snapshot as shown below:

NXF_VER=19.06.0-SNAPSHOT nextflow run ... <your cmd line options>

Please sign in to comment.