Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic task cleanup #3849

Draft
wants to merge 40 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
47d0168
Add initial task graph and metadata json file
bentsherman Mar 27, 2023
ae67027
Add task inputs and outputs to conrete DAG
bentsherman Mar 28, 2023
8f95cd6
Fix failing tests
bentsherman Mar 28, 2023
9f11e4b
Use path-based APIs to get file metadata
bentsherman Mar 29, 2023
db6aed1
Merge branch 'master' into ben-task-graph
bentsherman Mar 29, 2023
8456892
Use buffer to compute checksum
bentsherman Mar 30, 2023
77f2cdc
Add support for temporary output paths
bentsherman Mar 31, 2023
3e55ad5
Fix failing test
bentsherman Mar 31, 2023
e307f75
Add caveat about overlapping output channels [ci skip]
bentsherman Mar 31, 2023
08881b0
Delete files instead of emptying them (now supports directories and r…
bentsherman Apr 1, 2023
0cf07ec
Add `eager' cleanup option
bentsherman Apr 10, 2023
73b2f3b
Fix concurrency issues [ci fast]
bentsherman Apr 10, 2023
0dd98d6
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 21, 2023
0f505d3
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 26, 2023
e81e584
Replace synchronized with lock
bentsherman Apr 26, 2023
ba2e7a6
Merge branch 'ben-task-graph-pull' into 452-eager-cleanup
bentsherman Apr 28, 2023
f7bcfa8
Remove dependency on task graph branch
bentsherman Apr 28, 2023
4d90e27
Use downstream tasks to determine lifetime for task cleanup
bentsherman Apr 28, 2023
dd23b2a
Rename TemporaryFileObserver to TaskCleanupObserver
bentsherman Jul 7, 2023
6a34be6
Merge branch 'master' into 452-eager-cleanup
bentsherman Jul 7, 2023
ff08984
Wait for output files to be published
bentsherman Jul 7, 2023
9b343b6
Log warning if eager cleanup is used with incompatible publish modes
bentsherman Jul 7, 2023
6b5a820
Add eager cleanup for individual output files
bentsherman Jul 7, 2023
c42f249
Add warning about includeInputs with eager cleanup
bentsherman Jul 10, 2023
8b66c39
Add resumability
bentsherman Jul 11, 2023
43c939d
Upgrade kryo to 5.4.0 (#3562)
bentsherman Jul 11, 2023
5a78cf9
Add serializer for HashCode
bentsherman Jul 11, 2023
7b02b8e
Fix failing tests
bentsherman Jul 11, 2023
c727c05
Revert Kryo upgrade
bentsherman Jul 12, 2023
d37ee1f
Add kryo instantiator for HashCode
bentsherman Jul 12, 2023
859e96b
Remove HashCode serializer
bentsherman Jul 12, 2023
c89811c
Add log messages, fix infinite hang
bentsherman Jul 12, 2023
a034f60
Add support for includeInputs
bentsherman Jul 12, 2023
9741b1c
Update log messages
bentsherman Jul 12, 2023
3aae1af
Add lazy, eager, aggressive cleanup strategies
bentsherman Jul 12, 2023
08342dd
Merge branch 'master' into 452-eager-cleanup
bentsherman Sep 13, 2023
3498c5d
Add thread pool for task cleanup
bentsherman Sep 14, 2023
c421617
Minor improvement to checkCachedOutput()
bentsherman Sep 14, 2023
5b05aad
Merge branch 'master' into 452-eager-cleanup
bentsherman Jan 31, 2024
9ce10dc
minor edits
bentsherman Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 25 additions & 4 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1579,11 +1579,32 @@ Read the {ref}`trace-report` page to learn more about the execution report that
There are additional variables that can be defined within a configuration file that do not have a dedicated scope.

`cleanup`
: If `true`, on a successful completion of a run all files in *work* directory are automatically deleted.

:::{warning}
The use of the `cleanup` option will prevent the use of the *resume* feature on subsequent executions of that pipeline run. Also, be aware that deleting all scratch files can take a lot of time, especially when using a shared file system or remote cloud storage.
: :::{versionchanged} 23.10.0
Added `'lazy'`, `'eager'`, and `'aggressive'` strategies.
:::
: Automatically delete task directories using one of the following strategies:

`false` (default)
: Disable automatic cleanup.

`true`
: Equivalent to `'lazy'`.

`'lazy'`
: If a workflow completes successfully, delete all task directories.
: This strategy supports resumability for both successful and failed runs.
: Note that deleting all work directories at once can take a lot of time, especially when using a shared file system or remote cloud storage.

`'eager'`
: Delete each task directory as soon as it is no longer needed by downstream tasks.
: A task can be deleted once all of the tasks that use any of its output files have completed.
: This strategy supports resumability for both successful and failed runs.
: Output files that are published via symlink will be invalidated when the original task directory is deleted. Avoid using the following publish modes: `copyNoFollow`, `rellink`, `symlink`.

`'aggressive'`
: Equivalent to `'eager'`, but also deletes individual output files as soon as they are no longer needed.
: An output file can be deleted once the tasks that use it have completed. In some cases, an output file can be deleted sooner than its originating task.
: This strategy supports resumability for successful runs, but not necessarily for failed runs. Therefore, it is recommended when you want to minimize disk storage during the pipeline and you don't need resumability.

`dumpHashes`
: If `true`, dump task hash keys in the log file, for debugging purposes.
Expand Down
45 changes: 14 additions & 31 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ class Session implements ISession {
// -- file porter config
this.filePorter = new FilePorter(this)

// -- normalize cleanup config
if( config.cleanup == true )
config.cleanup = 'lazy'
}

protected Path cloudCachePath(Map cloudcache, Path workDir) {
Expand Down Expand Up @@ -964,6 +967,17 @@ class Session implements ISession {
}
}

void notifyProcessClose(TaskProcessor process) {
observers.each { observer ->
try {
observer.onProcessClose(process)
}
catch( Exception e ) {
log.debug(e.getMessage(), e)
}
}
}

void notifyProcessTerminate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down Expand Up @@ -1149,37 +1163,6 @@ class Session implements ISession {
errorAction = action
}

/**
* Delete the workflow work directory from tasks temporary files
*/
void cleanup() {
if( !workDir || !config.cleanup )
return

if( aborted || cancelled || error )
return

CacheDB db = null
try {
log.trace "Cleaning-up workdir"
db = CacheFactory.create(uniqueId, runName).openForRead()
db.eachRecord { HashCode hash, TraceRecord record ->
def deleted = db.removeTaskEntry(hash)
if( deleted ) {
// delete folder
FileHelper.deletePath(FileHelper.asPath(record.workDir))
}
}
log.trace "Clean workdir complete"
}
catch( Exception e ) {
log.warn("Failed to cleanup work dir: ${workDir.toUriString()}")
}
finally {
db.close()
}
}

@Memoized
CondaConfig getCondaConfig() {
final cfg = config.conda as Map ?: Collections.emptyMap()
Expand Down
45 changes: 37 additions & 8 deletions modules/nextflow/src/main/groovy/nextflow/cache/CacheDB.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,19 @@ class CacheDB implements Closeable {
* @param processor The {@link TaskProcessor} instance to be assigned to the retrieved task
* @return A {link TaskEntry} instance or {@code null} if a task for the given hash does not exist
*/
TaskEntry getTaskEntry(HashCode taskHash, TaskProcessor processor) {
TaskEntry getTaskEntry(HashCode taskHash, Map<String,TaskProcessor> processorLookup=[:]) {

final payload = store.getEntry(taskHash)
if( !payload )
return null

final record = (List)KryoHelper.deserialize(payload)
TraceRecord trace = TraceRecord.deserialize( (byte[])record[0] )
final processor = processorLookup[trace.get('process')]
TaskContext ctx = record[1]!=null && processor!=null ? TaskContext.deserialize(processor, (byte[])record[1]) : null
final consumers = record[3]!=null ? ((List<String>)record[3]).collect( s -> HashCode.fromString(s) ) : null

return new TaskEntry(trace,ctx)
return new TaskEntry(processor, trace, ctx, consumers)
}

void incTaskEntry( HashCode hash ) {
Expand All @@ -99,7 +101,7 @@ class CacheDB implements Closeable {

final record = (List)KryoHelper.deserialize(payload)
// third record contains the reference count for this record
record[2] = ((Integer)record[2]) +1
record[2] = ((Integer)record[2]) + 1
// save it again
store.putEntry(hash, KryoHelper.serialize(record))

Expand All @@ -114,7 +116,7 @@ class CacheDB implements Closeable {

final record = (List)KryoHelper.deserialize(payload)
// third record contains the reference count for this record
def count = record[2] = ((Integer)record[2]) -1
def count = record[2] = ((Integer)record[2]) - 1
// save or delete
if( count > 0 ) {
store.putEntry(hash, KryoHelper.serialize(record))
Expand All @@ -128,9 +130,10 @@ class CacheDB implements Closeable {


/**
* Save task runtime information to th cache DB
* Save task runtime information to the cache DB
*
* @param handler A {@link TaskHandler} instance
* @param handler
* @param trace
*/
@PackageScope
void writeTaskEntry0( TaskHandler handler, TraceRecord trace ) {
Expand All @@ -143,10 +146,11 @@ class CacheDB implements Closeable {
// only the 'cache' is active and
TaskContext ctx = proc.isCacheable() && task.hasCacheableValues() ? task.context : null

def record = new ArrayList(3)
final record = new ArrayList(4)
record[0] = trace.serialize()
record[1] = ctx != null ? ctx.serialize() : null
record[2] = 1
record[3] = null

// -- save in the db
store.putEntry( key, KryoHelper.serialize(record) )
Expand All @@ -157,6 +161,31 @@ class CacheDB implements Closeable {
writer.send { writeTaskEntry0(handler, trace) }
}

/**
* Finalize task entry in the cache DB with the list of
* consumer tasks.
*
* @param hash
* @param consumers
*/
@PackageScope
void finalizeTaskEntry0( HashCode hash, List<HashCode> consumers ) {

final payload = store.getEntry(hash)
if( !payload ) {
log.debug "Unable to finalize task with key: $hash"
return
}

final record = (List)KryoHelper.deserialize(payload)
record[3] = consumers.collect( h -> h.toString() )
store.putEntry(hash, KryoHelper.serialize(record))
}

void finalizeTaskAsync( HashCode hash, List<HashCode> consumers ) {
writer.send { finalizeTaskEntry0(hash, consumers) }
}

void cacheTaskAsync( TaskHandler handler ) {
writer.send {
writeTaskIndex0(handler,true)
Expand Down Expand Up @@ -224,7 +253,7 @@ class CacheDB implements Closeable {
}

TraceRecord getTraceRecord( HashCode hashCode ) {
final result = getTaskEntry(hashCode, null)
final result = getTaskEntry(hashCode)
return result ? result.trace : null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class PublishDir {

def result = new PublishDir()
if( params.path )
result.path = params.path
result.path = params.path

if( params.mode )
result.mode = params.mode
Expand Down Expand Up @@ -217,6 +217,18 @@ class PublishDir {
return result
}

boolean canPublish(Path source, TaskRun task) {
if( !sourceDir ) {
this.sourceDir = task.targetDir
this.sourceFileSystem = sourceDir.fileSystem
this.stageInMode = task.config.stageInMode
this.task = task
validatePublishMode()
}

return getPublishTarget(source) != null
}

protected void apply0(Set<Path> files) {
assert path

Expand Down Expand Up @@ -301,13 +313,8 @@ class PublishDir {

protected void apply1(Path source, boolean inProcess ) {

def target = sourceDir ? sourceDir.relativize(source) : source.getFileName()
if( matcher && !matcher.matches(target) ) {
// skip not matching file
return
}

if( saveAs && !(target=saveAs.call(target.toString()))) {
final target = getPublishTarget(source)
if( !target ) {
// skip this file
return
}
Expand Down Expand Up @@ -339,6 +346,17 @@ class PublishDir {

}

protected def getPublishTarget(Path source) {
def target = sourceDir ? sourceDir.relativize(source) : source.getFileName()
if( matcher && !matcher.matches(target) )
return null

if( saveAs && !(target=saveAs.call(target.toString())))
return null

return target
}

protected Path resolveDestination(target) {

if( target instanceof Path ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package nextflow.processor

import com.google.common.hash.HashCode
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import groovy.transform.TupleConstructor
import nextflow.processor.TaskProcessor
import nextflow.trace.TraceRecord
/**
* Model a task entry persisted in the {@link nextflow.cache.CacheDB}
Expand All @@ -30,8 +32,12 @@ import nextflow.trace.TraceRecord
@TupleConstructor
class TaskEntry {

TaskProcessor processor

TraceRecord trace

TaskContext context

List<HashCode> consumers

}