Skip to content

Commit

Permalink
Factor out abstract TaskCollector
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
  • Loading branch information
bentsherman committed May 10, 2024
1 parent 8ed6e4a commit 9ad9742
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@

package nextflow.executor

import java.nio.file.Path

import groovy.transform.CompileStatic
import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
/**
* Interface for executors that support job arrays.
*
Expand All @@ -29,18 +25,6 @@ import nextflow.processor.TaskRun
@CompileStatic
interface TaskArrayExecutor {

Path getWorkDir()

void submit( TaskRun task )

TaskHandler createTaskHandler(TaskRun task)

boolean isFusionEnabled()

String getChildWorkDir(TaskHandler handler)

String getChildLaunchCommand(String taskDir)

/**
* Get the environment variable name that provides the array index of a task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package nextflow.processor

import java.nio.file.Files
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand All @@ -35,106 +33,28 @@ import nextflow.util.Escape
*/
@Slf4j
@CompileStatic
class TaskArrayCollector {

/**
* The set of directives which are used by the job array.
*/
private static final List<String> SUBMIT_DIRECTIVES = [
'accelerator',
'arch',
'clusterOptions',
'cpus',
'disk',
'machineType',
'memory',
'queue',
'resourceLabels',
'resourceLimits',
'time',
// only needed for container-native executors and/or Fusion
'container',
'containerOptions',
]
class TaskArrayCollector extends TaskCollector {

private TaskProcessor processor

private TaskArrayExecutor executor

private int arraySize

private Lock sync = new ReentrantLock()

private List<TaskRun> array

private boolean closed = false

TaskArrayCollector(TaskProcessor processor, Executor executor, int arraySize) {
super(executor, arraySize)
if( executor !instanceof TaskArrayExecutor )
throw new IllegalArgumentException("Executor '${executor.name}' does not support job arrays")

this.processor = processor
this.executor = (TaskArrayExecutor)executor
this.arraySize = arraySize
this.array = new ArrayList<>(arraySize)
}

/**
* Add a task to the current array, and submit the array when it
* reaches the desired size.
*
* @param task
*/
void collect(TaskRun task) {
sync.lock()
try {
// submit task directly if the collector is closed
// or if the task is retried (since it might have dynamic resources)
if( closed || task.config.getAttempt() > 1 ) {
executor.submit(task)
return
}

// add task to the array
array.add(task)

// submit job array when it is ready
if( array.size() == arraySize ) {
executor.submit(createTaskArray(array))
array = new ArrayList<>(arraySize)
}
}
finally {
sync.unlock()
}
}

/**
* Close the collector, submitting any remaining tasks as a partial job array.
*/
void close() {
sync.lock()
try {
if( array.size() == 1 ) {
executor.submit(array.first())
}
else if( array.size() > 0 ) {
executor.submit(createTaskArray(array))
array = null
}
closed = true
}
finally {
sync.unlock()
}
private TaskArrayExecutor arrayExecutor() {
(TaskArrayExecutor)executor
}

/**
* Create the task run for a job array.
*
* @param tasks
*/
protected TaskArrayRun createTaskArray(List<TaskRun> tasks) {
@Override
protected TaskRun createAggregateTask(List<TaskRun> tasks) {
// prepare child job launcher scripts
final handlers = tasks.collect( t -> executor.createTaskHandler(t) )
for( TaskHandler handler : handlers ) {
Expand All @@ -150,22 +70,14 @@ class TaskArrayCollector {
final script = createArrayTaskScript(handlers)
log.debug "Creating task array run >> $workDir\n$script"

// create config for job array
final rawConfig = new HashMap<String,Object>(SUBMIT_DIRECTIVES.size())
for( final key : SUBMIT_DIRECTIVES ) {
final value = processor.config.get(key)
if( value != null )
rawConfig[key] = value
}

// create job array
final first = tasks.min( t -> t.index )
final taskArray = new TaskArrayRun(
id: first.id,
index: first.index,
processor: processor,
type: processor.taskBody.type,
config: new TaskConfig(rawConfig),
config: createAggregateConfig(processor),
context: new TaskContext(processor),
hash: hash,
workDir: workDir,
Expand Down Expand Up @@ -194,8 +106,8 @@ class TaskArrayCollector {
}

protected String getArrayIndexRef() {
final name = executor.getArrayIndexName()
final start = executor.getArrayIndexStart()
final name = arrayExecutor().getArrayIndexName()
final start = arrayExecutor().getArrayIndexStart()
final index = start > 0 ? "${name} - ${start}" : name
return '${array[' + index + ']}'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
package nextflow.processor

import java.nio.file.Files
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.executor.Executor
import nextflow.file.FileHelper
import nextflow.util.CacheHelper
import nextflow.util.Escape

/**
* Collect tasks and submit them as task batches to the underlying
* executor.
Expand All @@ -35,106 +32,25 @@ import nextflow.util.Escape
*/
@Slf4j
@CompileStatic
class TaskBatchCollector {

/**
* The set of directives which are used by the task batch.
*/
private static final List<String> SUBMIT_DIRECTIVES = [
'accelerator',
'arch',
'clusterOptions',
'cpus',
'disk',
'machineType',
'memory',
'queue',
'resourceLabels',
'resourceLimits',
'time',
// only needed for container-native executors and/or Fusion
'container',
'containerOptions',
]
class TaskBatchCollector extends TaskCollector {

private TaskProcessor processor

private Executor executor

private int batchSize

private boolean parallel

private Lock sync = new ReentrantLock()

private List<TaskRun> batch

private boolean closed = false

TaskBatchCollector(TaskProcessor processor, Executor executor, int batchSize, boolean parallel) {
super(executor, batchSize)
this.processor = processor
this.executor = executor
this.batchSize = batchSize
this.parallel = parallel
this.batch = new ArrayList<>(batchSize)
}

/**
* Add a task to the current batch, and submit the batch when it
* reaches the desired size.
*
* @param task
*/
void collect(TaskRun task) {
sync.lock()
try {
// submit task directly if the collector is closed
// or if the task is retried (since it might have dynamic resources)
if( closed || task.config.getAttempt() > 1 ) {
executor.submit(task)
return
}

// add task to the batch
batch.add(task)

// submit task batch when it is ready
if( batch.size() == batchSize ) {
executor.submit(createTaskBatch(batch))
batch = new ArrayList<>(batchSize)
}
}
finally {
sync.unlock()
}
}

/**
* Close the collector, submitting any remaining tasks as a partial task batch.
*/
void close() {
sync.lock()
try {
if( batch.size() == 1 ) {
executor.submit(batch.first())
}
else if( batch.size() > 0 ) {
executor.submit(createTaskBatch(batch))
batch = null
}
closed = true
}
finally {
sync.unlock()
}
}

/**
* Create the task run for a task batch.
*
* @param tasks
*/
protected TaskRun createTaskBatch(List<TaskRun> tasks) {
@Override
protected TaskRun createAggregateTask(List<TaskRun> tasks) {
// prepare child job launcher scripts
final handlers = tasks.collect( t -> executor.createTaskHandler(t) )
for( TaskHandler handler : handlers ) {
Expand All @@ -150,22 +66,14 @@ class TaskBatchCollector {
final script = createBatchTaskScript(handlers)
log.debug "Creating task batch run >> $workDir\n$script"

// create config for task batch
final rawConfig = new HashMap<String,Object>(SUBMIT_DIRECTIVES.size())
for( final key : SUBMIT_DIRECTIVES ) {
final value = processor.config.get(key)
if( value != null )
rawConfig[key] = value
}

// create task batch
final first = tasks.min( t -> t.index )
final taskBatch = new TaskBatchRun(
id: first.id,
index: first.index,
processor: processor,
type: processor.taskBody.type,
config: new TaskConfig(rawConfig),
config: createAggregateConfig(processor),
context: new TaskContext(processor),
hash: hash,
workDir: workDir,
Expand Down

0 comments on commit 9ad9742

Please sign in to comment.