Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic task cleanup #3849

Draft
wants to merge 40 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
47d0168
Add initial task graph and metadata json file
bentsherman Mar 27, 2023
ae67027
Add task inputs and outputs to conrete DAG
bentsherman Mar 28, 2023
8f95cd6
Fix failing tests
bentsherman Mar 28, 2023
9f11e4b
Use path-based APIs to get file metadata
bentsherman Mar 29, 2023
db6aed1
Merge branch 'master' into ben-task-graph
bentsherman Mar 29, 2023
8456892
Use buffer to compute checksum
bentsherman Mar 30, 2023
77f2cdc
Add support for temporary output paths
bentsherman Mar 31, 2023
3e55ad5
Fix failing test
bentsherman Mar 31, 2023
e307f75
Add caveat about overlapping output channels [ci skip]
bentsherman Mar 31, 2023
08881b0
Delete files instead of emptying them (now supports directories and r…
bentsherman Apr 1, 2023
0cf07ec
Add `eager' cleanup option
bentsherman Apr 10, 2023
73b2f3b
Fix concurrency issues [ci fast]
bentsherman Apr 10, 2023
0dd98d6
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 21, 2023
0f505d3
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 26, 2023
e81e584
Replace synchronized with lock
bentsherman Apr 26, 2023
ba2e7a6
Merge branch 'ben-task-graph-pull' into 452-eager-cleanup
bentsherman Apr 28, 2023
f7bcfa8
Remove dependency on task graph branch
bentsherman Apr 28, 2023
4d90e27
Use downstream tasks to determine lifetime for task cleanup
bentsherman Apr 28, 2023
dd23b2a
Rename TemporaryFileObserver to TaskCleanupObserver
bentsherman Jul 7, 2023
6a34be6
Merge branch 'master' into 452-eager-cleanup
bentsherman Jul 7, 2023
ff08984
Wait for output files to be published
bentsherman Jul 7, 2023
9b343b6
Log warning if eager cleanup is used with incompatible publish modes
bentsherman Jul 7, 2023
6b5a820
Add eager cleanup for individual output files
bentsherman Jul 7, 2023
c42f249
Add warning about includeInputs with eager cleanup
bentsherman Jul 10, 2023
8b66c39
Add resumability
bentsherman Jul 11, 2023
43c939d
Upgrade kryo to 5.4.0 (#3562)
bentsherman Jul 11, 2023
5a78cf9
Add serializer for HashCode
bentsherman Jul 11, 2023
7b02b8e
Fix failing tests
bentsherman Jul 11, 2023
c727c05
Revert Kryo upgrade
bentsherman Jul 12, 2023
d37ee1f
Add kryo instantiator for HashCode
bentsherman Jul 12, 2023
859e96b
Remove HashCode serializer
bentsherman Jul 12, 2023
c89811c
Add log messages, fix infinite hang
bentsherman Jul 12, 2023
a034f60
Add support for includeInputs
bentsherman Jul 12, 2023
9741b1c
Update log messages
bentsherman Jul 12, 2023
3aae1af
Add lazy, eager, aggressive cleanup strategies
bentsherman Jul 12, 2023
08342dd
Merge branch 'master' into 452-eager-cleanup
bentsherman Sep 13, 2023
3498c5d
Add thread pool for task cleanup
bentsherman Sep 14, 2023
c421617
Minor improvement to checkCachedOutput()
bentsherman Sep 14, 2023
5b05aad
Merge branch 'master' into 452-eager-cleanup
bentsherman Jan 31, 2024
9ce10dc
minor edits
bentsherman Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,21 @@ There are additional variables that can be defined within a configuration file t
The use of the `cleanup` option will prevent the use of the *resume* feature on subsequent executions of that pipeline run. Also, be aware that deleting all scratch files can take a lot of time, especially when using a shared file system or remote cloud storage.
:::

:::{versionadded} 23.10.0
:::

Setting `cleanup = 'eager'` enables the "eager" cleanup strategy. This strategy will delete each task directory as soon as it is no longer needed by downstream tasks, rather than at the end of the workflow run. This feature is useful for minimizing disk usage during the workflow run.

The lifetime of a task is determined by the downstream tasks that use the task's output files. When all of these tasks finish, the upstream task can be deleted.

The following caveats apply when using eager cleanup:

- Eager cleanup will break the resumability of your pipeline. If a workflow run fails, you will have to restart from the beginning, whereas with `cleanup = true` the cleanup would not have happened. As a result, eager cleanup is designed to be used only when you are confident that the workflow run will not fail.

- Output files should not be published via symlink when using eager cleanup, because the symlinks will be invalidated when the original task directory is deleted. Avoid using the following publish modes: `copyNoFollow`, `rellink`, `symlink`.

- Eager cleanup currently does not work properly with processes that forward input files with the `includeInputs` option. In this case, the forwarded input files will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output.

`dumpHashes`
: If `true`, dump task hash keys in the log file, for debugging purposes.

Expand Down
2 changes: 1 addition & 1 deletion modules/nextflow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
api "commons-codec:commons-codec:1.15"
api "commons-io:commons-io:2.11.0"
api "com.beust:jcommander:1.35"
api("com.esotericsoftware.kryo:kryo:2.24.0") { exclude group: 'com.esotericsoftware.minlog', module: 'minlog' }
api("com.esotericsoftware:kryo:5.5.0")
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
api('org.iq80.leveldb:leveldb:0.12')
api('org.eclipse.jgit:org.eclipse.jgit:6.5.0.202303070854-r')
api ('javax.activation:activation:1.1.1')
Expand Down
13 changes: 12 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,17 @@ class Session implements ISession {
}
}

void notifyProcessClose(TaskProcessor process) {
observers.each { observer ->
try {
observer.onProcessClose(process)
}
catch( Exception e ) {
log.debug(e.getMessage(), e)
}
}
}

void notifyProcessTerminate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down Expand Up @@ -1123,7 +1134,7 @@ class Session implements ISession {
* Delete the workflow work directory from tasks temporary files
*/
void cleanup() {
if( !workDir || !config.cleanup )
if( !workDir || config.cleanup != true )
return

if( aborted || cancelled || error )
Expand Down
45 changes: 37 additions & 8 deletions modules/nextflow/src/main/groovy/nextflow/cache/CacheDB.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,19 @@ class CacheDB implements Closeable {
* @param processor The {@link TaskProcessor} instance to be assigned to the retrieved task
* @return A {link TaskEntry} instance or {@code null} if a task for the given hash does not exist
*/
TaskEntry getTaskEntry(HashCode taskHash, TaskProcessor processor) {
TaskEntry getTaskEntry(HashCode taskHash, Map<String,TaskProcessor> processorLookup=[:]) {

final payload = store.getEntry(taskHash)
if( !payload )
return null

final record = (List)KryoHelper.deserialize(payload)
TraceRecord trace = TraceRecord.deserialize( (byte[])record[0] )
final processor = processorLookup[trace.get('process')]
TaskContext ctx = record[1]!=null && processor!=null ? TaskContext.deserialize(processor, (byte[])record[1]) : null
final consumers = record[3]!=null ? ((List<HashCode>)record[3]) : null

return new TaskEntry(trace,ctx)
return new TaskEntry(processor, trace, ctx, consumers)
}

void incTaskEntry( HashCode hash ) {
Expand All @@ -99,7 +101,7 @@ class CacheDB implements Closeable {

final record = (List)KryoHelper.deserialize(payload)
// third record contains the reference count for this record
record[2] = ((Integer)record[2]) +1
record[2] = ((Integer)record[2]) + 1
// save it again
store.putEntry(hash, KryoHelper.serialize(record))

Expand All @@ -114,7 +116,7 @@ class CacheDB implements Closeable {

final record = (List)KryoHelper.deserialize(payload)
// third record contains the reference count for this record
def count = record[2] = ((Integer)record[2]) -1
def count = record[2] = ((Integer)record[2]) - 1
// save or delete
if( count > 0 ) {
store.putEntry(hash, KryoHelper.serialize(record))
Expand All @@ -128,9 +130,10 @@ class CacheDB implements Closeable {


/**
* Save task runtime information to th cache DB
* Save task runtime information to the cache DB
*
* @param handler A {@link TaskHandler} instance
* @param handler
* @param trace
*/
@PackageScope
void writeTaskEntry0( TaskHandler handler, TraceRecord trace ) {
Expand All @@ -143,10 +146,11 @@ class CacheDB implements Closeable {
// only the 'cache' is active and
TaskContext ctx = proc.isCacheable() && task.hasCacheableValues() ? task.context : null

def record = new ArrayList(3)
final record = new ArrayList(4)
record[0] = trace.serialize()
record[1] = ctx != null ? ctx.serialize() : null
record[2] = 1
record[3] = null

// -- save in the db
store.putEntry( key, KryoHelper.serialize(record) )
Expand All @@ -157,6 +161,31 @@ class CacheDB implements Closeable {
writer.send { writeTaskEntry0(handler, trace) }
}

/**
* Finalize task entry in the cache DB with the list of
* consumer tasks.
*
* @param hash
* @param consumers
*/
@PackageScope
void finalizeTaskEntry0( HashCode hash, List<HashCode> consumers ) {

final payload = store.getEntry(hash)
if( !payload ) {
log.debug "Unable to finalize task with key: $hash"
return
}

final record = (List)KryoHelper.deserialize(payload)
record[3] = consumers
store.putEntry(hash, KryoHelper.serialize(record))
}

void finalizeTaskAsync( HashCode hash, List<HashCode> consumers ) {
writer.send { finalizeTaskEntry0(hash, consumers) }
}

void cacheTaskAsync( TaskHandler handler ) {
writer.send {
writeTaskIndex0(handler,true)
Expand Down Expand Up @@ -224,7 +253,7 @@ class CacheDB implements Closeable {
}

TraceRecord getTraceRecord( HashCode hashCode ) {
final result = getTaskEntry(hashCode, null)
final result = getTaskEntry(hashCode)
return result ? result.trace : null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class PublishDir {

def result = new PublishDir()
if( params.path )
result.path = params.path
result.path = params.path

if( params.mode )
result.mode = params.mode
Expand Down Expand Up @@ -202,6 +202,19 @@ class PublishDir {
return result
}

@CompileStatic
boolean canPublish(Path source, TaskRun task) {
if( !sourceDir ) {
this.sourceDir = task.targetDir
this.sourceFileSystem = sourceDir.fileSystem
this.stageInMode = task.config.stageInMode
this.taskName = task.name
validatePublishMode()
}

return getPublishTarget(source) != null
}

@CompileStatic
protected void apply0(Set<Path> files) {
assert path
Expand Down Expand Up @@ -290,13 +303,8 @@ class PublishDir {
@CompileStatic
protected void apply1(Path source, boolean inProcess ) {

def target = sourceDir ? sourceDir.relativize(source) : source.getFileName()
if( matcher && !matcher.matches(target) ) {
// skip not matching file
return
}

if( saveAs && !(target=saveAs.call(target.toString()))) {
final target = getPublishTarget(source)
if( !target ) {
// skip this file
return
}
Expand Down Expand Up @@ -328,6 +336,18 @@ class PublishDir {

}

@CompileStatic
protected def getPublishTarget(Path source) {
def target = sourceDir ? sourceDir.relativize(source) : source.getFileName()
if( matcher && !matcher.matches(target) )
return null

if( saveAs && !(target=saveAs.call(target.toString())))
return null

return target
}

@CompileStatic
protected Path resolveDestination(target) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package nextflow.processor

import com.google.common.hash.HashCode
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import groovy.transform.TupleConstructor
import nextflow.processor.TaskProcessor
import nextflow.trace.TraceRecord
/**
* Model a task entry persisted in the {@link nextflow.cache.CacheDB}
Expand All @@ -30,8 +32,12 @@ import nextflow.trace.TraceRecord
@TupleConstructor
class TaskEntry {

TaskProcessor processor

TraceRecord trace

TaskContext context

List<HashCode> consumers

}