Skip to content

Commit

Permalink
Improve K8s retry on transient failures [ci fast]
Browse files Browse the repository at this point in the history
This commit adds automatic retry for transient
failures caused by:
- OutOfcpu
- OutOfmemory

Credits: Lehmann-Fabian <fabian.lehmann@informatik.hu-berlin.de>

Signed-off-by: Lehmann-Fabian <fabian.lehmann@informatik.hu-berlin.de>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Nov 30, 2022
1 parent ab51360 commit d86ddc3
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package nextflow.exception

import groovy.transform.InheritConstructors

/**
* Exception thrown when pod cannot be scheduled due to k8s `OutOfmemory` error reason
*/
@InheritConstructors
class K8sOutOfCpuException extends RuntimeException implements ProcessRetryableException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package nextflow.exception

import groovy.transform.InheritConstructors

/**
* Exception thrown when pod cannot be scheduled due to k8s `OutOfcpu` error reason
*/
@InheritConstructors
class K8sOutOfMemoryException extends RuntimeException implements ProcessRetryableException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ import groovy.transform.InheritConstructors
*/
@InheritConstructors
@CompileStatic
class NodeTerminationException extends Exception {
class NodeTerminationException extends Exception implements ProcessRetryableException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nextflow.exception

/**
* Exceptions implementing this interface will always lead to a retry
*/
interface ProcessRetryableException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package nextflow.k8s.client

import nextflow.exception.K8sOutOfCpuException
import nextflow.exception.K8sOutOfMemoryException

import javax.net.ssl.HostnameVerifier
import javax.net.ssl.HttpsURLConnection
import javax.net.ssl.SSLContext
Expand Down Expand Up @@ -500,10 +503,12 @@ class K8sClient {
def msg = "K8s pod '$podName' execution failed"
if( status.reason ) msg += " - reason: ${status.reason}"
if( status.message ) msg += " - message: ${status.message}"
final err = status.reason == 'Shutdown'
? new NodeTerminationException(msg)
: new ProcessFailedException(msg)
throw err
switch ( status.reason ) {
case 'OutOfcpu': throw new K8sOutOfCpuException(msg)
case 'OutOfmemory': throw new K8sOutOfMemoryException(msg)
case 'Shutdown': throw new NodeTerminationException(msg)
default: throw new ProcessFailedException(msg)
}
}

throw new K8sResponseException("K8s undetermined status conditions for pod $podName", resp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import nextflow.dag.NodeMarker
import nextflow.exception.FailedGuardException
import nextflow.exception.MissingFileException
import nextflow.exception.MissingValueException
import nextflow.exception.NodeTerminationException
import nextflow.exception.ProcessRetryableException
import nextflow.exception.ProcessException
import nextflow.exception.ProcessFailedException
import nextflow.exception.ProcessUnrecoverableException
Expand Down Expand Up @@ -977,8 +977,8 @@ class TaskProcessor {
if( error instanceof Error ) throw error

// -- retry without increasing the error counts
if( task && (error.cause instanceof NodeTerminationException || error.cause instanceof CloudSpotTerminationException) ) {
if( error.cause instanceof NodeTerminationException )
if( task && (error.cause instanceof ProcessRetryableException || error.cause instanceof CloudSpotTerminationException) ) {
if( error.cause instanceof ProcessRetryableException )
log.info "[$task.hashLog] NOTE: ${error.message} -- Execution is retried"
else
log.info "[$task.hashLog] NOTE: ${error.message} -- Cause: ${error.cause.message} -- Execution is retried"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package nextflow.k8s.client

import nextflow.exception.K8sOutOfCpuException
import nextflow.exception.K8sOutOfMemoryException

import javax.net.ssl.HttpsURLConnection

import nextflow.exception.NodeTerminationException
Expand Down Expand Up @@ -673,6 +676,90 @@ class K8sClientTest extends Specification {
e.message == "K8s pod image cannot be pulled -- rpc error: code = Unknown desc = Error response from daemon: pull access denied for nextflow/foo, repository does not exist or may require 'docker login'"
}

def 'client should throw an exception when k8s is out of cpu' () {
given:
def JSON = '''
{
"kind": "Pod",
"apiVersion": "v1",
"metadata": {
"name": "nf-3b344812fe0aeb9554424bcf6caa7ffb",
"namespace": "default",
"uid": "0dd3c071-82a3-4b20-bdd4-0d34ff3d90bd",
"resourceVersion": "55320",
"creationTimestamp": "2022-09-23T13:43:48Z",
"labels": {
"app": "nextflow",
"processName": "combineFiles",
"runName": "insane-kare",
"sessionId": "uuid-85951b91-b5bc-4566-8938-fef4256c21c8",
"taskName": "combineFiles_1"
},
},
"spec": {
},
"status": {
"phase": "Failed",
"message": "Pod Node didn't have enough resource: cpu, requested: 4000, used: 2100, capacity: 6000",
"reason": "OutOfcpu",
"startTime": "2022-09-23T13:43:48Z"
}
}
'''

def client = Spy(K8sClient)
final POD_NAME = 'nf-3b344812fe0aeb9554424bcf6caa7ffb'

when:
client.podState(POD_NAME)
then:
1 * client.podStatus(POD_NAME) >> new K8sResponseJson(JSON)
def e = thrown(K8sOutOfCpuException)
e.message == "K8s pod 'nf-3b344812fe0aeb9554424bcf6caa7ffb' execution failed - reason: OutOfcpu - message: Pod Node didn't have enough resource: cpu, requested: 4000, used: 2100, capacity: 6000"
}

def 'client should throw an exception when k8s is out of memory' () {
given:
def JSON = '''
{
"kind": "Pod",
"apiVersion": "v1",
"metadata": {
"name": "nf-3b344812fe0aeb9554424bcf6caa7ffb",
"namespace": "default",
"uid": "0dd3c071-82a3-4b20-bdd4-0d34ff3d90bd",
"resourceVersion": "55320",
"creationTimestamp": "2022-09-23T13:43:48Z",
"labels": {
"app": "nextflow",
"processName": "combineFiles",
"runName": "insane-kare",
"sessionId": "uuid-85951b91-b5bc-4566-8938-fef4256c21c8",
"taskName": "combineFiles_2"
},
},
"spec": {
},
"status": {
"phase": "Failed",
"message": "Pod Node didn't have enough resource: memory, requested: 16106127360, used: 16158556160, capacity: 16778358784",
"reason": "OutOfmemory",
"startTime": "2022-09-23T13:42:59Z"
}
}
'''

def client = Spy(K8sClient)
final POD_NAME = 'nf-3b344812fe0aeb9554424bcf6caa7ffb'

when:
client.podState(POD_NAME)
then:
1 * client.podStatus(POD_NAME) >> new K8sResponseJson(JSON)
def e = thrown(K8sOutOfMemoryException)
e.message == "K8s pod 'nf-3b344812fe0aeb9554424bcf6caa7ffb' execution failed - reason: OutOfmemory - message: Pod Node didn't have enough resource: memory, requested: 16106127360, used: 16158556160, capacity: 16778358784"
}

def 'client should throw an exception when container status returns ImagePullBackOff' () {
def JSON = '''
{
Expand Down

0 comments on commit d86ddc3

Please sign in to comment.