Skip to content

Commit

Permalink
Add support for GCP dynamic machine type (#1198)
Browse files Browse the repository at this point in the history
See issue #1190

Signed-off-by: Lisle E. Mose <lisle.mose@gmail.com>
Signed-off-by: Lisle E. Mose <lmose@email.unc.edu>
  • Loading branch information
mozack authored and pditommaso committed Jun 27, 2019
1 parent e246041 commit 5c2c766
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 64 deletions.
44 changes: 30 additions & 14 deletions docs/google.rst
Expand Up @@ -276,8 +276,6 @@ Create a ``nextflow.config`` file in the project root directory. The config must

* Google Pipelines as Nextflow executor i.e. ``process.executor = 'google-pipelines'``.
* The Docker container images to be used to run pipeline tasks e.g. ``process.container = 'biocontainers/salmon:0.8.2--1'``.
* The Google Compute Engine machine instance type i.e. ``cloud.instanceType = 'n1-standard-1'``.
See the list of available machine types at the `this link <https://cloud.google.com/compute/docs/machine-types>`_.
* The Google Cloud `project` to run in e.g. ``google.project = 'rare-lattice-222412'``.
* The Google Cloud `region` or `zone`. You need to specify either one, **not** both. Multiple regions or zones can be
specified by separating them with a comma e.g. ``google.zone = 'us-central1-f,us-central-1-b'``.
Expand All @@ -289,10 +287,6 @@ Example::
container = 'your/container:latest'
}

cloud {
instanceType = 'n1-standard-1'
}

google {
project = 'your-project-id'
zone = 'europe-west1-b'
Expand All @@ -302,6 +296,36 @@ Example::
.. Note:: A container image must be specified to deploy the process execution. You can use a different Docker image for
each process using one or more :ref:`config-process-selectors`.

Process definition
------------------
Processes can be defined as usual and by default the ``cpus`` and ``memory`` directives are used to instantiate a custom
machine type with the specified compute resources. If ``memory`` is not specified, 1GB of memory is allocated per cpu.

The process ``machineType`` directive may optionally be used to specify a predifined Google Compute Platform `machine type <https://cloud.google.com/compute/docs/machine-types>`_
If specified, this value overrides the ``cpus`` and ``memory`` directives.
If the ``cpus`` and ``memory`` directives are used, the values must comply with the allowed custom machine type `specifications <https://cloud.google.com/compute/docs/instances/creating-instance-with-custom-machine-type#specifications>`_ . Extended memory is not directly supported, however high memory or cpu predefined
instances may be utilized using the ``machineType`` directive

Examples::

process custom_instance_using_cpus_and_memory {
cpus 8
memory '40 GB'

"""
<Your script here>
"""
}

process predefined_instance_overrides_cpus_and_memory {
cpus 8
memory '40 GB'
machineType 'n1-highmem-8'

"""
<Your script here>
"""
}

Pipeline execution
------------------
Expand Down Expand Up @@ -337,10 +361,6 @@ For example::
}
}

cloud {
instanceType = 'n1-ultramem-160'
}

google {
project = 'your-project-id'
zone = 'europe-west1-b'
Expand All @@ -359,10 +379,6 @@ specify the local storage for the jobs computed locally::
Limitation
----------

* All processes use the same instance (machine) type specified in the configuration.
Make sure to use an instance type having enough resources to fulfill the computing resources
needed (i.e. cpus, memory and local storage) for the jobs in your pipeline.

* Currently it's not possible to specify a disk type and size different from the default ones assigned
by the service depending the chosen instance type.

Expand Down
21 changes: 20 additions & 1 deletion docs/process.rst
Expand Up @@ -1175,6 +1175,7 @@ The directives are:
* `executor`_
* `ext`_
* `label`_
* `machineType`_
* `maxErrors`_
* `maxForks`_
* `maxRetries`_
Expand Down Expand Up @@ -1555,6 +1556,24 @@ This can be defined in the ``nextflow.config`` file as shown below::
process.ext.version = '2.5.3'


.. _process-machineType:

machineType
---------

The ``machineType`` can be used to specify a predefined Google Compute Platform `machine type <https://cloud.google.com/compute/docs/machine-types>`_
when running using the google-pipelines executor.
This directive is optional and if specified overrides the cpus and memory directives::

process googlePipelinesPredefinedMachineType {
machineType 'n1-highmem-8'

"""
<your script here>
"""
}
See also: `cpus`_ and `memory`_.

.. _process-maxErrors:

Expand Down Expand Up @@ -2259,4 +2278,4 @@ conditions::
'''
your_command --here
'''
}
}
Expand Up @@ -60,7 +60,7 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
'executor',
'ext',
'gpu',
'instanceType',
'machineType',
'queue',
'label',
'maxErrors',
Expand Down
Expand Up @@ -63,10 +63,10 @@ class ScriptRunnerTest extends Specification {
def 'test processor config'() {

/*
* test that the *instanceType* attribute is visible in the taskConfig object
* test that the *machineType* attribute is visible in the taskConfig object
*/
when:
def runner = new TestScriptRunner( process: [executor:'nope', instanceType:'alpha'] )
def runner = new TestScriptRunner( process: [executor:'nope', machineType:'alpha'] )
def script =
'''
process simpleTask {
Expand All @@ -82,34 +82,7 @@ class ScriptRunnerTest extends Specification {
runner.setScript(script).execute()
then:
runner.getScriptObj().getTaskProcessor().name == 'simpleTask'
runner.getScriptObj().getTaskProcessor().config.instanceType == 'alpha'


/*
* test that the *instanceType* property defined by the task (beta)
* override the one define in the main config (alpha)
*/
when:
def runner2 = new TestScriptRunner( process: [executor:'nope', instanceType:'alpha'] )
def script2 =
'''
process otherTask {
instanceType 'beta'
input:
val x from 1
output:
stdout result
"""echo $x"""
}
'''
runner2.setScript(script2).execute()

then:
runner2.getScriptObj().getTaskProcessor().name == 'otherTask'
runner2.getScriptObj().getTaskProcessor().config.instanceType == 'beta'

runner.getScriptObj().getTaskProcessor().config.machineType == 'alpha'
}


Expand Down
Expand Up @@ -127,7 +127,7 @@ class GooglePipelinesHelper {

protected Resources createResources(GooglePipelinesSubmitRequest req) {
configureResources(
req.instanceType,
req.machineType,
req.project,
req.zone,
req.region,
Expand Down Expand Up @@ -162,7 +162,7 @@ class GooglePipelinesHelper {
[ActionFlags.ALWAYS_RUN, ActionFlags.IGNORE_EXIT_STATUS])
}

Resources configureResources(String instanceType, String projectId, List<String> zone, List<String> region, String diskName, List<String> scopes = null, boolean preEmptible = false) {
Resources configureResources(String machineType, String projectId, List<String> zone, List<String> region, String diskName, List<String> scopes = null, boolean preEmptible = false) {

def disk = new Disk()
disk.setName(diskName)
Expand All @@ -172,7 +172,7 @@ class GooglePipelinesHelper {
serviceAccount.setScopes(scopes)

def vm = new VirtualMachine()
.setMachineType(instanceType)
.setMachineType(machineType)
.setDisks([disk])
.setServiceAccount(serviceAccount)
.setPreemptible(preEmptible)
Expand Down Expand Up @@ -220,4 +220,4 @@ class GooglePipelinesHelper {
return null
}
}
}
}
Expand Up @@ -29,7 +29,7 @@ import groovy.transform.ToString
@ToString(includeNames = true)
class GooglePipelinesSubmitRequest {

String instanceType
String machineType

String project

Expand Down
Expand Up @@ -34,6 +34,7 @@ import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.trace.TraceRecord
import nextflow.util.Escape
import nextflow.util.MemoryUnit
/**
* Task handler for Google Pipelines.
*
Expand All @@ -45,8 +46,6 @@ class GooglePipelinesTaskHandler extends TaskHandler {

static private List<String> UNSTAGE_CONTROL_FILES = [TaskRun.CMD_ERRFILE, TaskRun.CMD_OUTFILE, TaskRun.CMD_LOG, TaskRun.CMD_EXIT ]

static private String DEFAULT_INSTANCE_TYPE = 'n1-standard-1'

GooglePipelinesExecutor executor

@PackageScope GooglePipelinesConfiguration pipelineConfiguration
Expand All @@ -59,7 +58,7 @@ class GooglePipelinesTaskHandler extends TaskHandler {

private Path errorFile

private String instanceType
private String machineType

private String mountPath

Expand Down Expand Up @@ -93,8 +92,8 @@ class GooglePipelinesTaskHandler extends TaskHandler {
//Set the mount path to be the workdir that is parent of the hashed directories.
this.mountPath = task.workDir.parent.parent.toString()

//Get the instanceType to use for this task
instanceType = executor.getSession().config.navigate("cloud.instanceType") ?: DEFAULT_INSTANCE_TYPE
//Get the machineType to use for this task
this.machineType = getProcessMachineType()

validateConfiguration()
}
Expand All @@ -109,11 +108,38 @@ class GooglePipelinesTaskHandler extends TaskHandler {
if (!task.container) {
throw new ProcessUnrecoverableException("No container image specified for process $task.name -- Either specify the container to use in the process definition or with 'process.container' value in your config")
}
if (!instanceType) {
throw new ProcessUnrecoverableException("No instance type specified for process $task.name -- Please provide a 'cloud.instanceType' definition in your config")
}

//
// Use the process cpus and memory directives to create custom GCP instance
// If memory not specified, default to 1 GB per cpu. An absence of the cpus directive defaults to 1 cpu.
// If the process machineType is defined, use that instead of cpus/memory (must be a predefined GCP machine type)
// The deprecated cloud.instanceType will be used if specified and machineType is not specified
String getProcessMachineType() {
String machineType = getProcessMachineType(executor.getSession().config.navigate("cloud.instanceType"), task.config.machineType, task.config.cpus, task.config.memory)

log.trace "[GPAPI] Task: $task.name - Instance Type: $machineType"

return machineType
}

String getProcessMachineType(String cloudInstanceType, String taskMachineType, int cpus, MemoryUnit memory) {

if (cloudInstanceType != null) {
log.warn1("Configuration setting [cloud.instanceType] is deprecated. Please use either the process cpus/memory directives or process.machineType instead.", firstOnly: true, cacheKey: GooglePipelinesTaskHandler.class)
}

String machineType = taskMachineType ?: cloudInstanceType

if (machineType == null) {
long megabytes = memory != null ? memory.mega : cpus*1024
machineType = 'custom-' + cpus + '-' + megabytes
}

return machineType
}


protected void logEvents(Operation operation) {
final events = getEventsFromOp(operation)
if( !events )
Expand Down Expand Up @@ -294,7 +320,7 @@ class GooglePipelinesTaskHandler extends TaskHandler {
sharedMount = executor.helper.configureMount(diskName, mountPath)

def req = new GooglePipelinesSubmitRequest()
req.instanceType = instanceType
req.machineType = machineType
req.project = pipelineConfiguration.project
req.zone = pipelineConfiguration.zone
req.region = pipelineConfiguration.region
Expand Down Expand Up @@ -335,4 +361,4 @@ class GooglePipelinesTaskHandler extends TaskHandler {
JsonOutput.prettyPrint( JsonOutput.toJson(events) )
}

}
}
Expand Up @@ -247,7 +247,7 @@ class GooglePipelinesHelperTest extends Specification {
when:
def res = helper.createResources(req)
then:
req.instanceType >> 'n1-abc'
req.machineType >> 'n1-abc'
req.project >> 'my-project'
req.zone >> ['my-zone']
req.region >> ['my-region']
Expand Down
Expand Up @@ -144,7 +144,7 @@ class GooglePipelinesTaskHandlerTest extends GoogleSpecification {
pipelineConfiguration: config,
executor: executor,
task: task,
instanceType: 'n1-1234'
machineType: 'n1-1234'
)

when:
Expand All @@ -156,7 +156,7 @@ class GooglePipelinesTaskHandlerTest extends GoogleSpecification {
config.getRegion() >> ['my-region']
config.getPreemptible() >> true
// chek request object
req.instanceType == 'n1-1234'
req.machineType == 'n1-1234'
req.project == 'my-project'
req.zone == ['my-zone']
req.region == ['my-region']
Expand Down Expand Up @@ -197,7 +197,7 @@ class GooglePipelinesTaskHandlerTest extends GoogleSpecification {
def handler = new GooglePipelinesTaskHandler(
pipelineConfiguration: config,
executor: executor,
instanceType: 'n1-1234',
machineType: 'n1-1234',
task: task,
stagingCommands: ['alpha', 'beta', 'delta'],
unstagingCommands: ['foo', 'bar']
Expand Down

0 comments on commit 5c2c766

Please sign in to comment.