Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set AWS container properties from container options #2471

Merged
merged 31 commits into from
Dec 12, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c664e63
Update changelog
pditommaso Nov 30, 2021
b9008c8
Map the container options of the task into the equivalent AWS job con…
manuelesimi Dec 1, 2021
dad83bf
Adjust format.
manuelesimi Dec 2, 2021
9dce577
Set size and options for tmpfs mounts.
manuelesimi Dec 2, 2021
5e9c1cc
Add containerOptions map [ci fast]
pditommaso Oct 4, 2021
f2ef534
Merge remote-tracking branch 'origin/aws-container-options' into AWS/…
manuelesimi Dec 2, 2021
5e9ae82
Use a custom handler as command line option map. Support repeatable o…
manuelesimi Dec 3, 2021
8fa9ea7
Support command line options with dashes.
manuelesimi Dec 3, 2021
0c79646
Use the option map to set AWS container options.
manuelesimi Dec 3, 2021
e12edaa
Adjust format.
manuelesimi Dec 3, 2021
6c6cbf4
Fix misleading comment.
manuelesimi Dec 4, 2021
7cfbbb0
Add static compilation to new classses. Adapt the code to comply.
manuelesimi Dec 5, 2021
52bd6e1
Test for CmdLineOptionMap.
manuelesimi Dec 5, 2021
75c5623
Update AWS doc with supported container options.
manuelesimi Dec 5, 2021
fdcbe5b
Merge branch 'master' into aws/container_properties
manuelesimi Dec 5, 2021
963d97c
Fix some typos in the AWS cloud page.
manuelesimi Dec 6, 2021
28c5361
Merge remote-tracking branch 'origin/aws/container_properties' into A…
manuelesimi Dec 6, 2021
a75742f
Specify ulimit format.
manuelesimi Dec 6, 2021
01aa2a2
Check that the container options are not empty.
manuelesimi Dec 8, 2021
0576d5c
Resolve some glitches with one or zero container options.
manuelesimi Dec 8, 2021
522d2c7
Avoid to create unnecessary objects for AWS properties.
manuelesimi Dec 9, 2021
27e12a8
Merge branch 'master' into aws/container_properties
pditommaso Dec 10, 2021
f43be44
Aws Container options improvements
pditommaso Dec 10, 2021
80dbfa8
Fix build
pditommaso Dec 10, 2021
a724067
Remove unused code.
manuelesimi Dec 10, 2021
d1d9da0
Define a unit test for each container properties.
manuelesimi Dec 10, 2021
96c3fd4
Fix failing tests.
manuelesimi Dec 11, 2021
4c90edd
Remove invalid test [ci fast]
pditommaso Dec 12, 2021
4922034
Groovified code [ci fast]
pditommaso Dec 12, 2021
064eacd
Fix failing test
pditommaso Dec 12, 2021
397f3f0
Add required version to docs [ci skip]
pditommaso Dec 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package nextflow.cloud.aws.batch

import com.amazonaws.services.batch.model.ContainerProperties
import com.amazonaws.services.batch.model.KeyValuePair
import com.amazonaws.services.batch.model.LinuxParameters
import com.amazonaws.services.batch.model.Tmpfs
import com.amazonaws.services.batch.model.Ulimit

/**
* Maps task container options to AWS container properties
*
* @see <a href="https://docs.docker.com/engine/reference/commandline/run/">Docker run</a>
* @see <a href="https://docs.aws.amazon.com/batch/latest/APIReference/API_ContainerProperties.html">API Container Properties</a>
* @author Manuele Simi <manuele.simi@gmail.com>
*/
class AWSContainerOptionsMapper {

def options

protected AWSContainerOptionsMapper(String containerOptions) {
options = containerOptions.trim().split("\\s+")
pditommaso marked this conversation as resolved.
Show resolved Hide resolved
}

protected ContainerProperties addProperties(ContainerProperties containerProperties) {
if ( options.size() > 0 ) {
checkPrivileged(containerProperties)
checkEnvVars(containerProperties)
checkUser(containerProperties)
checkReadOnly(containerProperties)
checkUlimit(containerProperties)
checkLinuxParameters(containerProperties)
}
return containerProperties
}

protected void checkPrivileged(ContainerProperties containerProperties) {
if ( findOptionWithBooleanValue('--privileged') )
containerProperties.setPrivileged(true);
}

protected void checkEnvVars(ContainerProperties containerProperties) {
final keyValuePairs = new ArrayList<KeyValuePair>()
def values = findOptionWithMultipleValues('--env')
values.addAll(findOptionWithMultipleValues('-e'))
values.each { value ->
final tokens = value.tokenize('=')
keyValuePairs << new KeyValuePair().withName(tokens[0]).withValue(tokens.size() == 2 ? tokens[1] : null)
}
if ( keyValuePairs.size() > 0 )
containerProperties.setEnvironment(keyValuePairs)
}

protected void checkUser(ContainerProperties containerProperties) {
def user = findOptionWithSingleValue('-u')
if ( !user )
user = findOptionWithSingleValue('--user')
if ( user )
containerProperties.setUser(user)
}

protected void checkReadOnly(ContainerProperties containerProperties) {
if ( findOptionWithBooleanValue('--read-only') )
containerProperties.setReadonlyRootFilesystem(true);
}

protected void checkUlimit(ContainerProperties containerProperties) {
final ulimits = new ArrayList<Ulimit>()
findOptionWithMultipleValues('--ulimit').each { value ->
final tokens = value.tokenize('=')
final limits = tokens[1].tokenize(':')
if ( limits.size() > 1 )
ulimits << new Ulimit().withName(tokens[0])
.withSoftLimit(limits[0] as Integer).withHardLimit(limits[1] as Integer)
else
ulimits << new Ulimit().withName(tokens[0]).withSoftLimit(limits[0] as Integer)
}
if ( ulimits.size() > 0 )
containerProperties.setUlimits(ulimits)
}

protected void checkLinuxParameters(ContainerProperties containerProperties) {
final params = new LinuxParameters()

// shared Memory Size
def value = findOptionWithSingleValue('--shm-size')
if ( value )
params.setSharedMemorySize(value as Integer)

// tmpfs mounts, e.g --tmpfs /run:rw,noexec,nosuid,size=64
final tmpfs = new ArrayList<Tmpfs>()
findOptionWithMultipleValues('--tmpfs').each { ovalue ->
def matcher = ovalue =~ /^(?<path>.*):(?<options>.*?),size=(?<sizeMiB>.*)$/
if (matcher.matches()) {
tmpfs << new Tmpfs().withContainerPath(matcher.group('path'))
.withSize(matcher.group('sizeMiB') as Integer)
.withMountOptions(matcher.group('options').tokenize(','))
} else {
throw new IllegalArgumentException("Found a malformed value '${ovalue}' for --tmpfs option")
}
}
if ( tmpfs.size() > 0 )
params.setTmpfs(tmpfs)

// swap limit equal to memory plus swap
value = findOptionWithSingleValue('--memory-swap')
if ( value )
params.setMaxSwap(value as Integer)

// run an init inside the container
value = findOptionWithBooleanValue('--init')
if ( value )
params.setInitProcessEnabled(value)

// tune container memory swappiness
value = findOptionWithSingleValue('--memory-swappiness')
if ( value )
params.setSwappiness(value as Integer)

containerProperties.setLinuxParameters(params)
}

/**
* Finds the value of an option
* @param name the name of the option
* @return the value, if any, or empty
*/
protected def findOptionWithSingleValue(def name) {
def index = options.findIndexOf({ it == name })
if ( index != -1 ) {
if ( !isValidValue(options[index + 1] as String) )
throw new IllegalArgumentException("Found a malformed option '${name}' for the job container")
return options[index + 1] as String
}
return ''
}

/**
* Finds the values of an option that can be repeated
* @param name the name of the option
* @return the list of values
*/
protected def findOptionWithMultipleValues(String name) {
final values = new ArrayList<String>()
options.findIndexValues{ it == name }.collect { it as Integer }
.each { index ->
if ( !isValidValue(options[index + 1]) )
throw new IllegalArgumentException("Found a malformed option ${name} for the job container")
values << options[index + 1]
}
return values
}

/**
* Checks if a boolean flag exists
* @param name the name of the flag
* @return true if it exists, false otherwise
*/
protected def findOptionWithBooleanValue(def name) {
options.find { it == name }
}

/**
* Checks if the value of an option is valid
* @param value the value to check
* @return true if the value is valid, false otherwise
*/
protected boolean isValidValue(String value) {
!value.startsWith('--') && !value.startsWith('-')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,8 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
.withCommand('true')
// note the actual command, memory and cpus are overridden when the job is executed
.withResourceRequirements( _1_cpus, _1_gb )

def mapper = new AWSContainerOptionsMapper(task.getConfig().getContainerOptions())
mapper.addProperties(container)
final jobRole = opts.getJobRole()
if( jobRole )
container.setJobRoleArn(jobRole)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package nextflow.cloud.aws.batch

import nextflow.processor.TaskConfig
import nextflow.processor.TaskRun
import spock.lang.Specification

/**
* @author Manuele Simi <manuele.simi@gmail.com>
*/
class AWSContainerOptionsMapperTest extends Specification {

def 'should set env vars'() {

given:
def task = Mock(TaskRun)
task.getName() >> 'batch-task'
task.getConfig() >> new TaskConfig(containerOptions: '--env VAR_FOO -e VAR_FOO2=value2 --env VAR_FOO3=value3')
def handler = Spy(AwsBatchTaskHandler)
handler.task >> task

when:
def job = handler.makeJobDefRequest('repo/any_image:latest')
then:
1 * handler.getAwsOptions() >> { new AwsOptions(cliPath: '/bin/aws') }

def environment = job.getContainerProperties().getEnvironment()
environment.size() == 3
environment.get(0).toString() == '{Name: VAR_FOO,}'
environment.get(1).toString() == '{Name: VAR_FOO3,Value: value3}'
environment.get(2).toString() == '{Name: VAR_FOO2,Value: value2}'
}

def 'should set ulimits'() {

given:
def task = Mock(TaskRun)
task.getName() >> 'batch-task'
task.getConfig() >> new TaskConfig(containerOptions: '--ulimit nofile=1280:2560 --ulimit nproc=16:32')
def handler = Spy(AwsBatchTaskHandler)
handler.task >> task

when:
def job = handler.makeJobDefRequest('repo/any_image:latest')
then:
1 * handler.getAwsOptions() >> { new AwsOptions(cliPath: '/bin/aws') }

def properties = job.getContainerProperties()
properties.getUlimits().size() == 2
properties.getUlimits().get(0).toString() == '{HardLimit: 2560,Name: nofile,SoftLimit: 1280}'
properties.getUlimits().get(1).toString() == '{HardLimit: 32,Name: nproc,SoftLimit: 16}'
}

def 'should set privileged, readonly and user'() {

given:
def task = Mock(TaskRun)
task.getName() >> 'batch-task'
task.getConfig() >> new TaskConfig(containerOptions: '--privileged --read-only --user nf-user')
def handler = Spy(AwsBatchTaskHandler)
handler.task >> task

when:
def job = handler.makeJobDefRequest('repo/any_image:latest')
then:
1 * handler.getAwsOptions() >> { new AwsOptions(cliPath: '/bin/aws') }

job.getContainerProperties().getPrivileged() == true
job.getContainerProperties().getReadonlyRootFilesystem() == true
job.getContainerProperties().getUser() == 'nf-user'
}

def 'should set tmpfs linux params'() {

given:
def task = Mock(TaskRun)
task.getName() >> 'batch-task'
task.getConfig() >> new TaskConfig(containerOptions: '--tmpfs /run:rw,noexec,nosuid,size=64 --tmpfs /app:ro,size=128')
def handler = Spy(AwsBatchTaskHandler)
handler.task >> task

when:
def job = handler.makeJobDefRequest('repo/any_image:latest')
then:
1 * handler.getAwsOptions() >> { new AwsOptions(cliPath: '/bin/aws') }

def params = job.getContainerProperties().getLinuxParameters()
params.getTmpfs().size() == 2
params.getTmpfs().get(0).toString() == '{ContainerPath: /run,Size: 64,MountOptions: [rw, noexec, nosuid]}'
params.getTmpfs().get(1).toString() == '{ContainerPath: /app,Size: 128,MountOptions: [ro]}'
}

def 'should set memory linux params'() {

given:
def task = Mock(TaskRun)
task.getName() >> 'batch-task'
task.getConfig() >> new TaskConfig(containerOptions: '--memory-swappiness 90 --memory-swap 2048 --shm-size 1024 ')
def handler = Spy(AwsBatchTaskHandler)
handler.task >> task

when:
def job = handler.makeJobDefRequest('repo/any_image:latest')
then:
1 * handler.getAwsOptions() >> { new AwsOptions(cliPath: '/bin/aws') }

def params = job.getContainerProperties().getLinuxParameters()
params.getSharedMemorySize() == 1024
params.getMaxSwap() == 2048
params.getSwappiness() == 90

}
}